Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional single-threaded feature to bevy_ecs/bevy_tasks #6690

Merged
merged 20 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ filesystem_watcher = ["bevy_internal/filesystem_watcher"]

serialize = ["bevy_internal/serialize"]

# Disables all parallelism in the engine. Forces all engine tasks to run on a single thread.
single-threaded = ["bevy_internal/single-threaded"]

# Display server protocol support (X11 is enabled by default)
wayland = ["bevy_internal/wayland"]
x11 = ["bevy_internal/x11"]
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ categories = ["game-engines", "data-structures"]

[features]
trace = []
single-threaded = []
default = ["bevy_reflect"]

[dependencies]
Expand Down
24 changes: 24 additions & 0 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,28 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
);
}

#[inline]
#[cfg(feature = "single-threaded")]
pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
_batch_size: usize,
func: FN,
) {
self.for_each(world, func);
}

#[inline]
#[cfg(feature = "single-threaded")]
pub fn par_for_each_mut<'w, FN: Fn(Q::Item<'w>) + Send + Sync + Clone>(
&mut self,
world: &'w mut World,
_batch_size: usize,
func: FN,
) {
self.for_each_mut(world, func);
}

/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::par_for_each_mut`] for
Expand All @@ -842,6 +864,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
#[inline]
#[cfg(not(feature = "single-threaded"))]
pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
Expand All @@ -867,6 +890,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
#[inline]
#[cfg(not(feature = "single-threaded"))]
pub fn par_for_each_mut<'w, FN: Fn(Q::Item<'w>) + Send + Sync + Clone>(
&mut self,
world: &'w mut World,
Expand Down
13 changes: 11 additions & 2 deletions crates/bevy_ecs/src/schedule/stage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(not(feature = "single-threaded"))]
use crate::schedule::ParallelExecutor;
use crate::{
self as bevy_ecs,
change_detection::CHECK_TICK_THRESHOLD,
Expand All @@ -6,7 +8,7 @@ use crate::{
schedule::{
graph_utils::{self, DependencyGraphError},
BoxedRunCriteria, DuplicateLabelStrategy, ExclusiveInsertionPoint, GraphNode,
ParallelExecutor, ParallelSystemExecutor, RunCriteriaContainer, RunCriteriaDescriptor,
ParallelSystemExecutor, RunCriteriaContainer, RunCriteriaDescriptor,
RunCriteriaDescriptorOrLabel, RunCriteriaInner, RunCriteriaLabelId, ShouldRun,
SingleThreadedExecutor, SystemContainer, SystemDescriptor, SystemLabelId, SystemSet,
},
Expand Down Expand Up @@ -136,7 +138,14 @@ impl SystemStage {
}

pub fn parallel() -> Self {
Self::new(Box::<ParallelExecutor>::default())
#[cfg(not(feature = "single-threaded"))]
{
Self::new(Box::<ParallelExecutor>::default())
}
#[cfg(feature = "single-threaded")]
{
Self::single_threaded()
}
}

pub fn get_executor<T: ParallelSystemExecutor>(&self) -> Option<&T> {
Expand Down
20 changes: 20 additions & 0 deletions crates/bevy_ecs/src/system/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,15 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> {
};
}

#[cfg(feature = "single-threaded")]
pub fn par_for_each<'this>(
&'this self,
_batch_size: usize,
f: impl Fn(ROQueryItem<'this, Q>) + Send + Sync + Clone,
) {
self.for_each(f);
}

/// Runs `f` on each read-only query item in parallel.
///
/// Parallelization is achieved by using the [`World`]'s [`ComputeTaskPool`].
Expand All @@ -750,6 +759,7 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> {
///
/// - [`par_for_each_mut`](Self::par_for_each_mut) for operating on mutable query items.
#[inline]
#[cfg(not(feature = "single-threaded"))]
pub fn par_for_each<'this>(
&'this self,
batch_size: usize,
Expand All @@ -768,6 +778,15 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> {
};
}

#[cfg(feature = "single-threaded")]
pub fn par_for_each_mut<'a>(
&'a mut self,
_batch_size: usize,
f: impl Fn(Q::Item<'a>) + Send + Sync + Clone,
) {
self.for_each_mut(f);
}

/// Runs `f` on each read-only query item in parallel.
///
/// Parallelization is achieved by using the [`World`]'s [`ComputeTaskPool`].
Expand All @@ -783,6 +802,7 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> {
///
/// - [`par_for_each`](Self::par_for_each) for more usage details.
#[inline]
#[cfg(not(feature = "single-threaded"))]
pub fn par_for_each_mut<'a>(
&'a mut self,
batch_size: usize,
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ wav = ["bevy_audio/wav"]
filesystem_watcher = ["bevy_asset/filesystem_watcher"]

serialize = ["bevy_core/serialize", "bevy_input/serialize", "bevy_time/serialize", "bevy_window/serialize", "bevy_transform/serialize", "bevy_math/serialize", "bevy_scene/serialize"]
single-threaded = ["bevy_ecs/single-threaded", "bevy_tasks/single-threaded"]

# Display server protocol support (X11 is enabled by default)
wayland = ["bevy_winit/wayland"]
Expand Down
4 changes: 4 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy"
license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[features]
single-threaded = []
default = []

[dependencies]
futures-lite = "1.4.0"
async-executor = "1.3.0"
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), not(feature = "single-threaded")))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), not(feature = "single-threaded")))]
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};

#[cfg(target_arch = "wasm32")]
#[cfg(any(not(target_arch = "wasm32"), feature = "single-threaded"))]
mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
#[cfg(any(not(target_arch = "wasm32"), feature = "single-threaded"))]
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};

mod usages;
Expand Down
74 changes: 50 additions & 24 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{
future::Future,
marker::PhantomData,
mem,
sync::{Arc, Mutex},
};
use std::{cell::RefCell, future::Future, marker::PhantomData, mem, rc::Rc};

thread_local! {
#[cfg(not(target_arch = "wasm32"))]
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
}

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -57,7 +57,7 @@ impl TaskPool {
1
}

/// Allows spawning non-`static futures on the thread pool. The function takes a callback,
/// Allows spawning non-'static futures on the thread pool. The function takes a callback,
/// passing a scope object into it. The scope object provided to the callback can be used
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
Expand All @@ -71,8 +71,9 @@ impl TaskPool {
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };

let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };
let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
Expand All @@ -88,29 +89,36 @@ impl TaskPool {
// Loop until all tasks are done
while executor.try_tick() {}

let results = scope.results.lock().unwrap();
let results = scope.results.borrow();
results
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.map(|result| result.borrow_mut().take().unwrap())
.collect()
}

/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above `spawn` implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.
/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
/// cancelled and "detached" allowing it to continue running without having to be polled by the
/// end-user.
///
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
T: 'static,
{
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});

#[cfg(not(target_arch = "wasm32"))]
{
LOCAL_EXECUTOR.with(|executor| {
let _task = executor.spawn(future);
// Loop until all tasks are done
while executor.try_tick() {}
});
}

FakeTask
}

Expand All @@ -121,6 +129,24 @@ impl TaskPool {
{
self.spawn(future)
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
LOCAL_EXECUTOR.with(f)
}
}

#[derive(Debug)]
Expand All @@ -138,7 +164,7 @@ impl FakeTask {
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'env async_executor::LocalExecutor<'env>,
// Vector to gather results of all futures spawned during scope run
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,
results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand All @@ -163,10 +189,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.lock().unwrap().push(result.clone());
let result = Rc::new(RefCell::new(None));
self.results.borrow_mut().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
result.borrow_mut().replace(f.await);
};
self.executor.spawn(f).detach();
}
Expand Down
1 change: 1 addition & 0 deletions docs/cargo_features.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
|mp3|MP3 audio format support.|
|wav|WAV audio format support.|
|serialize|Enables serialization of `bevy_input` types.|
|single-threaded|Disables all parallelism in the engine. All engine tasks run on a single thread. Does nothing on WASM.|
|wayland|Enable this to use Wayland display server protocol other than X11.|
|subpixel_glyph_atlas|Enable this to cache glyphs using subpixel accuracy. This increases texture memory usage as each position requires a separate sprite in the glyph atlas, but provide more accurate character spacing.|
|bevy_ci_testing|Used for running examples in CI.|
Expand Down