diff --git a/benches/benches/bevy_ecs/iteration/heavy_compute.rs b/benches/benches/bevy_ecs/iteration/heavy_compute.rs index 440d1bcb22f6f..f00a095da8eda 100644 --- a/benches/benches/bevy_ecs/iteration/heavy_compute.rs +++ b/benches/benches/bevy_ecs/iteration/heavy_compute.rs @@ -1,5 +1,5 @@ use bevy_ecs::prelude::*; -use bevy_tasks::{ComputeTaskPool, TaskPool}; +use bevy_tasks::TaskPool; use criterion::Criterion; use glam::*; @@ -19,9 +19,8 @@ pub fn heavy_compute(c: &mut Criterion) { let mut group = c.benchmark_group("heavy_compute"); group.warm_up_time(std::time::Duration::from_millis(500)); group.measurement_time(std::time::Duration::from_secs(4)); + TaskPool::init(TaskPool::default); group.bench_function("base", |b| { - ComputeTaskPool::init(TaskPool::default); - let mut world = World::default(); world.spawn_batch((0..1000).map(|_| { diff --git a/benches/benches/bevy_tasks/iter.rs b/benches/benches/bevy_tasks/iter.rs index 74b043f9a234e..7001e7cfcbb7f 100644 --- a/benches/benches/bevy_tasks/iter.rs +++ b/benches/benches/bevy_tasks/iter.rs @@ -1,4 +1,4 @@ -use bevy_tasks::{ParallelIterator, TaskPoolBuilder}; +use bevy_tasks::{ParallelIterator, TaskPool}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; struct ParChunks<'a, T>(std::slice::Chunks<'a, T>); @@ -34,7 +34,7 @@ fn bench_overhead(c: &mut Criterion) { let mut v = (0..10000).collect::>(); let mut group = c.benchmark_group("overhead_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPool::build().threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, @@ -69,7 +69,7 @@ fn bench_for_each(c: &mut Criterion) { let mut v = (0..10000).collect::>(); let mut group = c.benchmark_group("for_each_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPool::build().threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, @@ -115,7 +115,7 @@ fn bench_many_maps(c: &mut Criterion) { let v = (0..10000).collect::>(); let mut group = c.benchmark_group("many_maps_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPool::build().threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index 97c3ede26e623..2c46994fe6135 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -7,7 +7,7 @@ use crate::{ use anyhow::Result; use bevy_ecs::system::{Res, ResMut, Resource}; use bevy_log::warn; -use bevy_tasks::IoTaskPool; +use bevy_tasks::{TaskGroup, TaskPool}; use bevy_utils::{Entry, HashMap, Uuid}; use crossbeam_channel::TryRecvError; use parking_lot::{Mutex, RwLock}; @@ -448,8 +448,8 @@ impl AssetServer { pub(crate) fn load_untracked(&self, asset_path: AssetPath<'_>, force: bool) -> HandleId { let server = self.clone(); let owned_path = asset_path.to_owned(); - IoTaskPool::get() - .spawn(async move { + TaskPool::get() + .spawn(TaskGroup::IO, async move { if let Err(err) = server.load_async(owned_path, force).await { warn!("{}", err); } @@ -701,7 +701,7 @@ mod test { fn setup(asset_path: impl AsRef) -> AssetServer { use crate::FileAssetIo; - IoTaskPool::init(Default::default); + TaskPool::init(Default::default); AssetServer::new(FileAssetIo::new(asset_path, false)) } diff --git a/crates/bevy_asset/src/debug_asset_server.rs b/crates/bevy_asset/src/debug_asset_server.rs index 690dd6769dc7c..72f7f2856e461 100644 --- a/crates/bevy_asset/src/debug_asset_server.rs +++ b/crates/bevy_asset/src/debug_asset_server.rs @@ -8,7 +8,7 @@ use bevy_ecs::{ schedule::SystemLabel, system::{NonSendMut, Res, ResMut, Resource, SystemState}, }; -use bevy_tasks::{IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::TaskPool; use bevy_utils::HashMap; use std::{ ops::{Deref, DerefMut}, @@ -67,9 +67,9 @@ impl Default for HandleMap { impl Plugin for DebugAssetServerPlugin { fn build(&self, app: &mut bevy_app::App) { - IoTaskPool::init(|| { - TaskPoolBuilder::default() - .num_threads(2) + TaskPool::init(|| { + TaskPool::build() + .max_threads(2) .thread_name("Debug Asset Server IO Task Pool".to_string()) .build() }); diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index fbc175c1e3f89..54d1d3df2ddc8 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -4,47 +4,44 @@ mod name; #[cfg(feature = "serialize")] mod serde; -mod task_pool_options; use bevy_ecs::system::Resource; pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable}; pub use name::*; -pub use task_pool_options::*; pub mod prelude { //! The Bevy Core Prelude. #[doc(hidden)] - pub use crate::{CorePlugin, Name, TaskPoolOptions}; + pub use crate::{CorePlugin, Name}; } use bevy_app::prelude::*; use bevy_ecs::entity::Entity; use bevy_reflect::{ReflectDeserialize, ReflectSerialize}; +use bevy_tasks::{TaskPool, TaskPoolBuilder}; use bevy_utils::{Duration, HashSet, Instant}; use std::borrow::Cow; use std::ops::Range; #[cfg(not(target_arch = "wasm32"))] use bevy_ecs::schedule::IntoSystemDescriptor; -#[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::tick_global_task_pools_on_main_thread; /// Adds core functionality to Apps. #[derive(Default)] pub struct CorePlugin { /// Options for the [`TaskPool`](bevy_tasks::TaskPool) created at application start. - pub task_pool_options: TaskPoolOptions, + pub task_pool_builder: TaskPoolBuilder, } impl Plugin for CorePlugin { fn build(&self, app: &mut App) { // Setup the default bevy task pools - self.task_pool_options.create_default_pools(); + TaskPool::init(|| self.task_pool_builder.clone().build()); #[cfg(not(target_arch = "wasm32"))] app.add_system_to_stage( bevy_app::CoreStage::Last, - tick_global_task_pools_on_main_thread.at_end(), + TaskPool::flush_local_tasks.at_end(), ); app.register_type::().register_type::(); @@ -111,38 +108,22 @@ pub struct FrameCount(pub u32); #[cfg(test)] mod tests { use super::*; - use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; + use bevy_tasks::prelude::TaskPool; #[test] fn runs_spawn_local_tasks() { let mut app = App::new(); app.add_plugin(CorePlugin::default()); - let (async_tx, async_rx) = crossbeam_channel::unbounded(); - AsyncComputeTaskPool::get() - .spawn_local(async move { - async_tx.send(()).unwrap(); - }) - .detach(); - - let (compute_tx, compute_rx) = crossbeam_channel::unbounded(); - ComputeTaskPool::get() - .spawn_local(async move { - compute_tx.send(()).unwrap(); - }) - .detach(); - - let (io_tx, io_rx) = crossbeam_channel::unbounded(); - IoTaskPool::get() + let (tx, rx) = crossbeam_channel::unbounded(); + TaskPool::get() .spawn_local(async move { - io_tx.send(()).unwrap(); + tx.send(()).unwrap(); }) .detach(); app.run(); - async_rx.try_recv().unwrap(); - compute_rx.try_recv().unwrap(); - io_rx.try_recv().unwrap(); + rx.try_recv().unwrap(); } } diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs deleted file mode 100644 index 4537354a69c05..0000000000000 --- a/crates/bevy_core/src/task_pool_options.rs +++ /dev/null @@ -1,153 +0,0 @@ -use bevy_ecs::prelude::Resource; -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; -use bevy_utils::tracing::trace; - -/// Defines a simple way to determine how many threads to use given the number of remaining cores -/// and number of total cores -#[derive(Clone)] -pub struct TaskPoolThreadAssignmentPolicy { - /// Force using at least this many threads - pub min_threads: usize, - /// Under no circumstance use more than this many threads for this pool - pub max_threads: usize, - /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is - /// permitted to use 1.0 to try to use all remaining threads - pub percent: f32, -} - -impl TaskPoolThreadAssignmentPolicy { - /// Determine the number of threads to use for this task pool - fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { - assert!(self.percent >= 0.0); - let mut desired = (total_threads as f32 * self.percent).round() as usize; - - // Limit ourselves to the number of cores available - desired = desired.min(remaining_threads); - - // Clamp by min_threads, max_threads. (This may result in us using more threads than are - // available, this is intended. An example case where this might happen is a device with - // <= 2 threads. - desired.clamp(self.min_threads, self.max_threads) - } -} - -/// Helper for configuring and creating the default task pools. For end-users who want full control, -/// set up [`CorePlugin`](super::CorePlugin) -#[derive(Clone, Resource)] -pub struct TaskPoolOptions { - /// If the number of physical cores is less than min_total_threads, force using - /// min_total_threads - pub min_total_threads: usize, - /// If the number of physical cores is greater than max_total_threads, force using - /// max_total_threads - pub max_total_threads: usize, - - /// Used to determine number of IO threads to allocate - pub io: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of async compute threads to allocate - pub async_compute: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of compute threads to allocate - pub compute: TaskPoolThreadAssignmentPolicy, -} - -impl Default for TaskPoolOptions { - fn default() -> Self { - TaskPoolOptions { - // By default, use however many cores are available on the system - min_total_threads: 1, - max_total_threads: std::usize::MAX, - - // Use 25% of cores for IO, at least 1, no more than 4 - io: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use 25% of cores for async compute, at least 1, no more than 4 - async_compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - }, - - // Use all remaining cores for compute (at least 1) - compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: std::usize::MAX, - percent: 1.0, // This 1.0 here means "whatever is left over" - }, - } - } -} - -impl TaskPoolOptions { - /// Create a configuration that forces using the given number of threads. - pub fn with_num_threads(thread_count: usize) -> Self { - TaskPoolOptions { - min_total_threads: thread_count, - max_total_threads: thread_count, - ..Default::default() - } - } - - /// Inserts the default thread pools into the given resource map based on the configured values - pub fn create_default_pools(&self) { - let total_threads = bevy_tasks::available_parallelism() - .clamp(self.min_total_threads, self.max_total_threads); - trace!("Assigning {} cores to default task pools", total_threads); - - let mut remaining_threads = total_threads; - - { - // Determine the number of IO threads we will use - let io_threads = self - .io - .get_number_of_threads(remaining_threads, total_threads); - - trace!("IO Threads: {}", io_threads); - remaining_threads = remaining_threads.saturating_sub(io_threads); - - IoTaskPool::init(|| { - TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of async compute threads we will use - let async_compute_threads = self - .async_compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Async Compute Threads: {}", async_compute_threads); - remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - - AsyncComputeTaskPool::init(|| { - TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build() - }); - } - - { - // Determine the number of compute threads we will use - // This is intentionally last so that an end user can specify 1.0 as the percent - let compute_threads = self - .compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Compute Threads: {}", compute_threads); - - ComputeTaskPool::init(|| { - TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build() - }); - } - } -} diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index a91ed6634f1b6..d0fa9923f6094 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -62,7 +62,7 @@ mod tests { system::Resource, world::{Mut, World}, }; - use bevy_tasks::{ComputeTaskPool, TaskPool}; + use bevy_tasks::TaskPool; use std::{ any::TypeId, marker::PhantomData, @@ -388,7 +388,7 @@ mod tests { #[test] fn par_for_each_dense() { - ComputeTaskPool::init(TaskPool::default); + TaskPool::init(TaskPool::default); let mut world = World::new(); let e1 = world.spawn(A(1)).id(); let e2 = world.spawn(A(2)).id(); @@ -410,7 +410,7 @@ mod tests { #[test] fn par_for_each_sparse() { - ComputeTaskPool::init(TaskPool::default); + TaskPool::init(TaskPool::default); let mut world = World::new(); let e1 = world.spawn(SparseStored(1)).id(); let e2 = world.spawn(SparseStored(2)).id(); diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 245ff7dacb7b4..3be289d4e433a 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -9,7 +9,7 @@ use crate::{ storage::TableId, world::{World, WorldId}, }; -use bevy_tasks::ComputeTaskPool; +use bevy_tasks::{TaskGroup, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use fixedbitset::FixedBitSet; @@ -839,7 +839,7 @@ impl QueryState { /// write-queries. /// /// # Panics - /// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being + /// The [`TaskPool`] is not initialized. If using this from a query that is being /// initialized and run from the ECS scheduler, this should never panic. #[inline] pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>( @@ -864,7 +864,7 @@ impl QueryState { /// Runs `func` on each query result in parallel. /// /// # Panics - /// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being + /// The [`TaskPool`] is not initialized. If using this from a query that is being /// initialized and run from the ECS scheduler, this should never panic. #[inline] pub fn par_for_each_mut<'w, FN: Fn(Q::Item<'w>) + Send + Sync + Clone>( @@ -891,7 +891,7 @@ impl QueryState { /// This can only be called for read-only queries. /// /// # Panics - /// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being + /// The [`TaskPool`] is not initialized. If using this from a query that is being /// initialized and run from the ECS scheduler, this should never panic. /// /// # Safety @@ -986,7 +986,7 @@ impl QueryState { /// iter() method, but cannot be chained like a normal [`Iterator`]. /// /// # Panics - /// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being + /// The [`TaskPool`] is not initialized. If using this from a query that is being /// initialized and run from the ECS scheduler, this should never panic. /// /// # Safety @@ -1008,7 +1008,7 @@ impl QueryState { ) { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual - ComputeTaskPool::get().scope(|scope| { + TaskPool::get().scope(TaskGroup::Compute, |scope| { if Q::IS_DENSE && F::IS_DENSE { let tables = &world.storages().tables; for table_id in &self.matched_table_ids { diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 68dd1f1ea798d..66508e703dc51 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -5,7 +5,7 @@ use crate::{ world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{Scope, TaskGroup, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -124,7 +124,7 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - ComputeTaskPool::init(TaskPool::default).scope(|scope| { + TaskPool::init(TaskPool::default).scope(TaskGroup::Compute, |scope| { self.prepare_systems(scope, systems, world); if self.should_run.count_ones(..) == 0 { return; diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs index 573afa70aae51..117c69e244a26 100644 --- a/crates/bevy_ecs/src/system/commands/parallel_scope.rs +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -24,7 +24,6 @@ pub struct ParallelCommandsState { /// Example: /// ``` /// # use bevy_ecs::prelude::*; -/// # use bevy_tasks::ComputeTaskPool; /// # /// # #[derive(Component)] /// # struct Velocity; diff --git a/crates/bevy_ecs/src/system/query.rs b/crates/bevy_ecs/src/system/query.rs index 8f7e4f70bc303..88a8d11ba54b3 100644 --- a/crates/bevy_ecs/src/system/query.rs +++ b/crates/bevy_ecs/src/system/query.rs @@ -727,23 +727,23 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> { /// Runs `f` on each read-only query item in parallel. /// - /// Parallelization is achieved by using the [`World`]'s [`ComputeTaskPool`]. + /// Parallelization is achieved by using the global [`TaskPool`]. /// /// # Tasks and batch size /// /// The items in the query get sorted into batches. /// Internally, this function spawns a group of futures that each take on a `batch_size` sized section of the items (or less if the division is not perfect). - /// Then, the tasks in the [`ComputeTaskPool`] work through these futures. + /// Then, the tasks in the [`TaskPool`] work through these futures. /// /// You can use this value to tune between maximum multithreading ability (many small batches) and minimum parallelization overhead (few big batches). /// Rule of thumb: If the function body is (mostly) computationally expensive but there are not many items, a small batch size (=more batches) may help to even out the load. /// If the body is computationally cheap and you have many items, a large batch size (=fewer batches) avoids spawning additional futures that don't help to even out the load. /// - /// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool + /// [`TaskPool`]: bevy_tasks::prelude::TaskPool /// /// # Panics /// - /// This method panics if the [`ComputeTaskPool`] resource is added to the `World` before using this method. + /// This method panics if the [`TaskPool`] is not initialized. /// If using this from a query that is being initialized and run from the [`Schedule`](crate::schedule::Schedule), this never panics. /// /// # See also @@ -770,14 +770,14 @@ impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> Query<'w, 's, Q, F> { /// Runs `f` on each read-only query item in parallel. /// - /// Parallelization is achieved by using the [`World`]'s [`ComputeTaskPool`]. + /// Parallelization is achieved by using the global [`TaskPool`]. /// /// # Panics /// - /// This method panics if the [`ComputeTaskPool`] resource is added to the `World` before using this method. + /// This method panics if the [`TaskPool`] is not initialized. /// If using this from a query that is being initialized and run from the [`Schedule`](crate::schedule::Schedule), this never panics. /// - /// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool + /// [`TaskPool`]: bevy_tasks::prelude::TaskPool /// /// # See also /// diff --git a/crates/bevy_gltf/src/loader.rs b/crates/bevy_gltf/src/loader.rs index 5459c37d5dd9a..6bf99776a8847 100644 --- a/crates/bevy_gltf/src/loader.rs +++ b/crates/bevy_gltf/src/loader.rs @@ -31,7 +31,7 @@ use bevy_render::{ }; use bevy_scene::Scene; #[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::IoTaskPool; +use bevy_tasks::{TaskGroup, TaskPool}; use bevy_transform::components::Transform; use bevy_utils::{HashMap, HashSet}; @@ -408,8 +408,8 @@ async fn load_gltf<'a, 'b>( } } else { #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() - .scope(|scope| { + TaskPool::get() + .scope(TaskGroup::IO, |scope| { gltf.textures().for_each(|gltf_texture| { let linear_textures = &linear_textures; let load_context: &LoadContext = load_context; diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index f86fb78d3bc23..68a41a5720808 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,15 +9,20 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [dependencies] -futures-lite = "1.4.0" -async-executor = "1.3.0" -async-channel = "1.4.2" -async-task = "4.2.0" +concurrent-queue = "1.2" +async-task = "4.2" once_cell = "1.7" -concurrent-queue = "1.2.2" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +futures-lite = "1.4.0" +tracing = "0.1" +st3 = "0.3" +fastrand = "1.1" +parking_lot = "0.12" +event-listener = "2.5" + [dev-dependencies] instant = { version = "0.1", features = ["wasm-bindgen"] } diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 8a74034e0ca90..7a6a47e922bf8 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,17 +1,17 @@ -use bevy_tasks::TaskPoolBuilder; +use bevy_tasks::{TaskGroup, TaskPool}; -// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin -// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical +// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that +// spin for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical // cores) fn main() { - let pool = TaskPoolBuilder::new() - .thread_name("Busy Behavior ThreadPool".to_string()) - .num_threads(4) + let pool = TaskPool::build() + .threads(4) + .thread_name("Busy Behavior ThreadPool") .build(); let t0 = instant::Instant::now(); - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { for i in 0..40 { s.spawn(async move { let now = instant::Instant::now(); diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs index daa2eaf2e2a89..83273c9a0c402 100644 --- a/crates/bevy_tasks/examples/idle_behavior.rs +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -1,15 +1,15 @@ -use bevy_tasks::TaskPoolBuilder; +use bevy_tasks::{TaskGroup, TaskPool}; // This sample demonstrates a thread pool with one thread per logical core and only one task // spinning. Other than the one thread, the system should remain idle, demonstrating good behavior // for small workloads. fn main() { - let pool = TaskPoolBuilder::new() - .thread_name("Idle Behavior ThreadPool".to_string()) + let pool = TaskPool::build() + .thread_name("Idle Behavior ThreadPool") .build(); - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { for i in 0..1 { s.spawn(async move { println!("Blocking for 10 seconds"); diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs new file mode 100644 index 0000000000000..d3e531d1bcc73 --- /dev/null +++ b/crates/bevy_tasks/src/executor.rs @@ -0,0 +1,546 @@ +// Forked from async_executor + +use crate::TaskPool; +use parking_lot::Mutex; +use std::future::Future; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Poll, Waker}; + +use async_task::Runnable; +use concurrent_queue::ConcurrentQueue; +use futures_lite::{future, prelude::*}; +use st3::{Stealer, Worker, B512}; + +#[doc(no_inline)] +pub use async_task::Task; + +/// An async executor. +#[derive(Debug)] +pub struct Executor<'a> { + /// The executor state. + state: Arc, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData>, +} + +unsafe impl Send for Executor<'_> {} +unsafe impl Sync for Executor<'_> {} + +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl<'a> Executor<'a> { + /// Creates a new executor. + #[inline] + pub fn new(thread_counts: &[usize]) -> Executor<'a> { + Executor { + state: Arc::new(State::new(thread_counts)), + _marker: PhantomData, + } + } + + /// Spawns a task onto the executor. + pub fn spawn( + &self, + priority: usize, + future: impl Future + Send + 'a, + ) -> Task { + // Create the task and schedule it + let (runnable, task) = + unsafe { async_task::spawn_unchecked(future, self.schedule(priority)) }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + pub fn try_tick(&self, priority: usize) -> bool { + let group = &self.state.groups[priority]; + match group.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + group.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + /// Runs the executor until the given future completes. + pub async fn run( + &self, + priority: usize, + thread_id: usize, + future: impl Future, + ) -> T { + let mut runner = Runner::new(priority, thread_id, &self.state); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable().await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self, priority: usize) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state.clone(); + + move |runnable| { + let group = &state.groups[priority]; + group.queue.push(runnable).unwrap(); + group.notify(); + } + } +} + +impl Drop for Executor<'_> { + fn drop(&mut self) { + for group in self.state.groups.iter() { + let mut sleepers = group.sleepers.lock(); + for (_, waker) in sleepers.wakers.drain(..) { + waker.wake(); + } + drop(sleepers); + while group.queue.pop().is_ok() {} + } + } +} + +/// The state of a executor. +#[derive(Debug)] +struct State { + groups: Box<[GroupState]>, +} + +impl State { + /// Creates state for a new executor. + fn new(thread_counts: &[usize]) -> State { + let groups = thread_counts + .iter() + .map(|count| { + let workers = (0..*count) + .map(|_| Worker::::new()) + .collect::>(); + let stealers = workers + .iter() + .map(|w| w.stealer()) + .collect::>() + .into_boxed_slice(); + // TODO: This is a hack only for initialziation + // probably should refactor it out. + let available = ConcurrentQueue::bounded(*count.max(&1)); + for worker in workers { + available.push(worker).unwrap(); + } + GroupState { + queue: ConcurrentQueue::unbounded(), + stealers, + available, + searchers: AtomicUsize::new(0), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + } + }) + .collect::>(); + State { + groups: groups.into(), + } + } +} + +#[derive(Debug)] +struct GroupState { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + stealers: Box<[Stealer]>, + + available: ConcurrentQueue>, + + searchers: AtomicUsize, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + sleepers: Mutex, +} + +impl GroupState { + /// Notifies a sleeping ticker. Returns true if the notification search + /// should continue. + #[inline] + fn notify(&self) -> bool { + if self + .notified + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let (waker, should_continue) = { + let mut sleepers = self.sleepers.lock(); + let waker = sleepers.notify(); + (waker, sleepers.is_empty()) + }; + if let Some(w) = waker { + w.wake(); + } + return should_continue; + } + false + } + + /// Attempt to start a new search over the queues. Returns + /// false if there are too many searchers and that the runner + /// should abort the search. + #[inline] + fn start_search(&self) -> bool { + let searchers = self.searchers.load(Ordering::Acquire); + if 2 * searchers > self.stealers.len() { + return false; + } + self.searchers.fetch_add(1, Ordering::Release); + true + } + + /// Ends a search. Returns true if this is the last searcher + /// and that the runner wake and notify other runners. + #[inline] + fn end_search(&self) -> bool { + self.searchers.fetch_sub(1, Ordering::Release) == 1 + } +} + +/// A list of sleeping tickers. +#[derive(Debug)] +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(usize, Waker)>, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + self.count += 1; + let id = self.free_ids.pop().unwrap_or(self.count); + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + if !item.1.will_wake(waker) { + item.1 = waker.clone(); + } + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.wakers.is_empty() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +#[derive(Debug)] +struct Runner<'a> { + priority: usize, + thread_id: usize, + + /// The executor state. + group: &'a GroupState, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, + + state: &'a State, + /// The local queue. + worker: Worker, + rng: fastrand::Rng, + /// Bumped every time a runnable task is found. + ticks: usize, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(priority: usize, thread_id: usize, state: &State) -> Runner<'_> { + let group = &state.groups[priority]; + let worker = group.available.pop().unwrap(); + Runner { + priority, + thread_id, + state, + sleeping: 0, + worker, + group, + rng: fastrand::Rng::new(), + ticks: 0, + } + } + + fn priority_iter(&self) -> impl Iterator { + // Prioritize the immediate responsibility of the runner, then search in reverse order + std::iter::once(self.priority).chain((self.priority + 1..self.state.groups.len()).rev()) + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.group.sleepers.lock(); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + self.group + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + let id = self.sleeping; + self.sleeping = 0; + if id != 0 { + let mut sleepers = self.group.sleepers.lock(); + sleepers.remove(id); + + self.group + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + } + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self) -> Runnable { + let runnable = future::poll_fn(|cx| { + loop { + // Try the local queue. + if let Some(r) = self.worker.pop() { + self.wake_and_notify(); + return Poll::Ready(r); + } + + // Try the local task queue. + let local_r = TaskPool::LOCAL_EXECUTOR.with(|local| local.try_fetch()); + if let Some(r) = local_r { + self.wake_and_notify(); + return Poll::Ready(r); + } + + // Try stealing from global queues. + for priority in self.priority_iter() { + let group = &self.state.groups[priority]; + let stealers = &self.state.groups[priority].stealers; + + if !group.start_search() { + continue; + } + + if let Ok(r) = group.queue.pop() { + self.steal(&group.queue); + if group.end_search() { + self.wake_and_notify(); + } + return Poll::Ready(r); + } + + // // Pick a random starting point in the iterator list and rotate the list. + if !stealers.is_empty() { + let start = self.rng.usize(..stealers.len()); + // Try stealing from each local queue in the list. + for idx in start..start + stealers.len() { + let idx = idx % stealers.len(); + let stealer = &stealers[idx]; + if priority == self.priority && idx == self.thread_id { + continue; + } + // Limit the number of higher priority tasks stolen to avoid taking + // too many. Higher priority threads can't steal these tasks back. + // + // Only steal enough such that every other thread in the local priority + // can steal one task. + let limit = if priority > self.priority { + self.group.stealers.len() + } else { + usize::MAX + }; + let count_fn = |n: usize| ((n + 1) / 2).max(limit); + if let Ok((r, _)) = stealer.steal_and_pop(&self.worker, count_fn) { + if group.end_search() { + self.wake_and_notify(); + } + return Poll::Ready(r); + } + } + } + + group.end_search(); + } + + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + }) + .await; + + // Bump the tick counter. + self.ticks += 1; + if self.ticks % 64 == 0 { + // Steal tasks from the global queue to ensure fair task scheduling. + self.steal(&self.state.groups[self.priority].queue); + } + + runnable + } + + fn wake_and_notify(&mut self) { + // Wake up. + self.wake(); + // Notify another ticker now to pick up where this ticker left off, just in + // case running the new task takes a long time. + for group in self.state.groups[..=self.priority].iter().rev() { + if !group.notify() { + return; + } + } + } + + /// Steals some items from one queue into another the local queue. + fn steal(&self, src: &ConcurrentQueue) { + if src.is_empty() { + // Don't steal more than fits into the queue. + for _ in 0..self.worker.spare_capacity() { + let Ok(t) = src.pop() else { break }; + let res = self.worker.push(t); + debug_assert!(res.is_ok()); + } + } + } +} + +impl Drop for Runner<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + let id = self.sleeping; + self.sleeping = 0; + if id != 0 { + let mut sleepers = self.group.sleepers.lock(); + let notified = sleepers.remove(id); + + self.group + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.group.notify(); + } + } + + // Re-schedule remaining tasks in the local queue. + while let Some(r) = self.worker.pop() { + r.schedule(); + } + } +} + +/// Runs a closure when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 952bc53075551..4d2be1931b25d 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,4 +1,4 @@ -use crate::TaskPool; +use crate::{TaskGroup, TaskPool}; mod adapters; pub use adapters::*; @@ -34,7 +34,7 @@ where /// /// See [`Iterator::count()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.count) fn count(mut self, pool: &TaskPool) -> usize { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.count() }); } @@ -105,7 +105,7 @@ where where F: FnMut(BatchIter::Item) + Send + Clone + Sync, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { @@ -195,7 +195,7 @@ where C: std::iter::FromIterator, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.collect::>() }); } @@ -216,7 +216,7 @@ where BatchIter::Item: Send + 'static, { let (mut a, mut b) = <(C, C)>::default(); - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.partition::, F>(newf) }); @@ -242,7 +242,7 @@ where F: FnMut(C, BatchIter::Item) -> C + Send + Sync + Clone, C: Clone + Send + Sync + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); let newi = init.clone(); @@ -260,7 +260,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(mut batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.all(newf) }); @@ -279,7 +279,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(mut batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.any(newf) }); @@ -299,7 +299,7 @@ where where F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, { - let poses = pool.scope(|s| { + let poses = pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let mut newf = f.clone(); s.spawn(async move { @@ -332,7 +332,7 @@ where where BatchIter::Item: Ord + Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.max() }); } @@ -349,7 +349,7 @@ where where BatchIter::Item: Ord + Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.min() }); } @@ -368,7 +368,7 @@ where F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.max_by_key(newf) }); @@ -388,7 +388,7 @@ where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.max_by(newf) }); @@ -408,7 +408,7 @@ where F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.min_by_key(newf) }); @@ -428,7 +428,7 @@ where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, BatchIter::Item: Send + 'static, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { let newf = f.clone(); s.spawn(async move { batch.min_by(newf) }); @@ -482,7 +482,7 @@ where S: std::iter::Sum + Send + 'static, R: std::iter::Sum, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.sum() }); } @@ -499,7 +499,7 @@ where S: std::iter::Product + Send + 'static, R: std::iter::Product, { - pool.scope(|s| { + pool.scope(TaskGroup::Compute, |s| { while let Some(batch) = self.next_batch() { s.spawn(async move { batch.product() }); } diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 802f6c267b7cf..b3860535ff598 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -1,12 +1,17 @@ #![warn(missing_docs)] #![doc = include_str!("../README.md")] +mod local_executor; mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; -pub use task::Task; +pub use task::{Task, TaskGroup}; +#[cfg(not(target_arch = "wasm32"))] +mod executor; +#[cfg(not(target_arch = "wasm32"))] +mod simple_executor; #[cfg(not(target_arch = "wasm32"))] mod task_pool; #[cfg(not(target_arch = "wasm32"))] @@ -17,21 +22,22 @@ mod single_threaded_task_pool; #[cfg(target_arch = "wasm32")] pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder}; -mod usages; -#[cfg(not(target_arch = "wasm32"))] -pub use usages::tick_global_task_pools_on_main_thread; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; +mod task_pool_builder; mod iter; pub use iter::ParallelIterator; #[allow(missing_docs)] pub mod prelude { + #[cfg(target_arch = "wasm32")] + pub use crate::single_threaded_task_pool::TaskPool; + #[cfg(not(target_arch = "wasm32"))] + pub use crate::task_pool::TaskPool; #[doc(hidden)] pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, + TaskGroup, }; } diff --git a/crates/bevy_tasks/src/local_executor.rs b/crates/bevy_tasks/src/local_executor.rs new file mode 100644 index 0000000000000..e3a0fec977b33 --- /dev/null +++ b/crates/bevy_tasks/src/local_executor.rs @@ -0,0 +1,76 @@ +use async_task::{Runnable, Task}; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::future::Future; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::rc::Rc; + +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +#[derive(Debug)] +pub struct LocalExecutor<'a> { + queue: RefCell>, + + /// Makes the type `!Send` and `!Sync`. + _marker: PhantomData<&'a Rc<()>>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + pub fn new() -> LocalExecutor<'a> { + LocalExecutor { + queue: RefCell::new(VecDeque::new()), + _marker: PhantomData, + } + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + 'a) -> Task { + // SAFETY: The spawned Task can only be progressed via `try_tick` which must be accessed + // from the thread that owns the executor and the task. + // + // Even if the returned Task and waker are sent to another thread, the associated inner + // task is only dropped when `try_tick` is triggered. + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; + self.queue.borrow_mut().push_back(runnable); + task + } + + /// Attempts to fetch a task if at least one is scheduled. + #[inline] + pub fn try_fetch(&self) -> Option { + self.queue.borrow_mut().pop_front() + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + #[inline] + pub fn try_tick(&self) -> bool { + match self.try_fetch() { + None => false, + Some(runnable) => { + runnable.run(); + true + } + } + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + '_ { + move |runnable| { + self.queue.borrow_mut().push_back(runnable); + } + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} diff --git a/crates/bevy_tasks/src/simple_executor.rs b/crates/bevy_tasks/src/simple_executor.rs new file mode 100644 index 0000000000000..589f6e4f80331 --- /dev/null +++ b/crates/bevy_tasks/src/simple_executor.rs @@ -0,0 +1,61 @@ +use async_task::{Runnable, Task}; +use concurrent_queue::ConcurrentQueue; +use std::future::Future; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; + +/// A simple MPSC executor. +/// +/// The executor can be run on any thread and enqueue tasks from any thread. +/// All enqueued tasks must be [`Send`]. Internally only has one queue and +/// does not do any form of work stealing. +#[derive(Debug)] +pub struct SimpleExecutor<'a> { + queue: ConcurrentQueue, + _marker: PhantomData<&'a ()>, +} + +impl UnwindSafe for SimpleExecutor<'_> {} +impl RefUnwindSafe for SimpleExecutor<'_> {} + +impl<'a> SimpleExecutor<'a> { + /// Creates a single-threaded executor. + pub fn new() -> SimpleExecutor<'a> { + SimpleExecutor { + queue: ConcurrentQueue::unbounded(), + _marker: PhantomData, + } + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { + // SAFETY: The provided future is Send and scoped to the lifetime of the executor. + // + // Even if the returned Task and waker are sent to another thread, the associated inner + // task is only dropped when `try_tick` is triggered. + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; + self.queue.push(runnable).unwrap(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + #[inline] + pub fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + runnable.run(); + true + } + } + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + '_ + Send + Sync { + move |runnable| { + self.queue.push(runnable).unwrap(); + } + } +} diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8fa37f4f2361b..a49c7c6db67c7 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,3 +1,7 @@ +use crate::local_executor::LocalExecutor; +pub use crate::task_pool_builder::TaskPoolBuilder; +use crate::TaskGroup; +use once_cell::sync::OnceCell; use std::{ future::Future, marker::PhantomData, @@ -5,50 +9,50 @@ use std::{ sync::{Arc, Mutex}, }; -/// Used to create a TaskPool -#[derive(Debug, Default, Clone)] -pub struct TaskPoolBuilder {} - -impl TaskPoolBuilder { - /// Creates a new TaskPoolBuilder instance - pub fn new() -> Self { - Self::default() - } - - /// No op on the single threaded task pool - pub fn num_threads(self, _num_threads: usize) -> Self { - self - } - - /// No op on the single threaded task pool - pub fn stack_size(self, _stack_size: usize) -> Self { - self - } - - /// No op on the single threaded task pool - pub fn thread_name(self, _thread_name: String) -> Self { - self - } - - /// Creates a new [`TaskPool`] - pub fn build(self) -> TaskPool { - TaskPool::new_internal() - } -} +static GLOBAL_TASK_POOL: OnceCell = OnceCell::new(); /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. In this case - main thread only. +/// +/// # Scheduling Semantics +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Compute. Compute is higher priority than IO, which are both higher priority than async compute. +/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize +/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would +/// otherwise be sitting idle. +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// async compute thread groups, but any IO task will take precedence over any compute task on the IO +/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. +/// +/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered +/// via [`TaskPoolBuilder`] when constructing the pool. #[derive(Debug, Default, Clone)] pub struct TaskPool {} impl TaskPool { + /// Initializes the global [`TaskPool`] instance. + pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self { + GLOBAL_TASK_POOL.get_or_init(f) + } + + /// Gets the global [`ComputeTaskPool`] instance. + /// + /// # Panics + /// Panics if no pool has been initialized yet. + pub fn get() -> &'static Self { + GLOBAL_TASK_POOL.get().expect( + "A TaskPool has not been initialized yet. Please call \ + TaskPool::init beforehand.", + ) + } + /// Create a `TaskPool` with the default configuration. pub fn new() -> Self { TaskPoolBuilder::new().build() } - #[allow(unused_variables)] - fn new_internal() -> Self { + pub(crate) fn new_internal(_: TaskPoolBuilder) -> Self { Self {} } @@ -62,14 +66,13 @@ impl TaskPool { /// to spawn tasks. This function will await the completion of all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` - pub fn scope<'env, F, T>(&self, f: F) -> Vec + pub fn scope<'env, F, T>(&self, _: TaskGroup, f: F) -> Vec where F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), T: Send + 'static, { - let executor = &async_executor::LocalExecutor::new(); - let executor: &'env async_executor::LocalExecutor<'env> = - unsafe { mem::transmute(executor) }; + let executor = &LocalExecutor::new(); + let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) }; let results: Mutex>>>> = Mutex::new(Vec::new()); let results: &'env Mutex>>>> = unsafe { mem::transmute(&results) }; @@ -97,14 +100,15 @@ impl TaskPool { /// Spawns a static future onto the JS event loop. For now it is returning FakeTask /// instance with no-op detach method. Returning real Task is possible here, but tricky: - /// future is running on JS event loop, Task is running on async_executor::LocalExecutor + /// future is running on JS event loop, Task is running on LocalExecutor /// so some proxy future is needed. Moreover currently we don't have long-living /// LocalExecutor here (above `spawn` implementation creates temporary one) /// But for typical use cases it seems that current implementation should be sufficient: /// caller can spawn long-running future writing results to some channel / event queue /// and simply call detach on returned Task (like AssetServer does) - spawned future /// can write results to some channel / event queue. - pub fn spawn(&self, future: impl Future + 'static) -> FakeTask + #[inline] + pub fn spawn(&self, _: TaskGroup, future: impl Future + 'static) -> FakeTask where T: 'static, { @@ -119,7 +123,7 @@ impl TaskPool { where T: 'static, { - self.spawn(future) + self.spawn(TaskGroup::Compute, future) } } @@ -136,7 +140,7 @@ impl FakeTask { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'env async_executor::LocalExecutor<'env>, + executor: &'env LocalExecutor<'env>, // Vector to gather results of all futures spawned during scope run results: &'env Mutex>>>>, diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index 4b5d875ea989b..69c45819ebddb 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,4 +1,4 @@ -use super::TaskPool; +use super::{TaskGroup, TaskPool}; /// Provides functions for mapping read-only slices across a provided [`TaskPool`]. pub trait ParallelSlice: AsRef<[T]> { @@ -13,7 +13,7 @@ pub trait ParallelSlice: AsRef<[T]> { /// ```rust /// # use bevy_tasks::prelude::*; /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); + /// let task_pool = TaskPool::default(); /// let counts = (0..10000).collect::>(); /// let incremented = counts.par_chunk_map(&task_pool, 100, |chunk| { /// let mut results = Vec::new(); @@ -37,7 +37,7 @@ pub trait ParallelSlice: AsRef<[T]> { { let slice = self.as_ref(); let f = &f; - task_pool.scope(|scope| { + task_pool.scope(TaskGroup::Compute, |scope| { for chunk in slice.chunks(chunk_size) { scope.spawn(async move { f(chunk) }); } @@ -57,7 +57,7 @@ pub trait ParallelSlice: AsRef<[T]> { /// ```rust /// # use bevy_tasks::prelude::*; /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); + /// let task_pool = TaskPool::default(); /// let counts = (0..10000).collect::>(); /// let incremented = counts.par_splat_map(&task_pool, None, |chunk| { /// let mut results = Vec::new(); @@ -107,7 +107,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { /// ```rust /// # use bevy_tasks::prelude::*; /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); + /// let task_pool = TaskPool::default(); /// let mut counts = (0..10000).collect::>(); /// let incremented = counts.par_chunk_map_mut(&task_pool, 100, |chunk| { /// let mut results = Vec::new(); @@ -134,7 +134,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { { let slice = self.as_mut(); let f = &f; - task_pool.scope(|scope| { + task_pool.scope(TaskGroup::Compute, |scope| { for chunk in slice.chunks_mut(chunk_size) { scope.spawn(async move { f(chunk) }); } @@ -154,7 +154,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { /// ```rust /// # use bevy_tasks::prelude::*; /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); + /// let task_pool = TaskPool::default(); /// let mut counts = (0..10000).collect::>(); /// let incremented = counts.par_splat_map_mut(&task_pool, None, |chunk| { /// let mut results = Vec::new(); @@ -206,8 +206,8 @@ mod tests { #[test] fn test_par_chunks_map() { let v = vec![42; 1000]; - let task_pool = TaskPool::new(); - let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); + let task_pool = TaskPool::init(TaskPool::default); + let outputs = v.par_splat_map(task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); let mut sum = 0; for output in outputs { @@ -220,9 +220,9 @@ mod tests { #[test] fn test_par_chunks_map_mut() { let mut v = vec![42; 1000]; - let task_pool = TaskPool::new(); + let task_pool = TaskPool::init(TaskPool::default); - let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { + let outputs = v.par_splat_map_mut(task_pool, None, |numbers| -> i32 { for number in numbers.iter_mut() { *number *= 2; } diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index 360909c2e9671..af81fc92aa500 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -4,6 +4,46 @@ use std::{ task::{Context, Poll}, }; +/// A group a task is assigned to upon being spawned. +/// +/// By default, `Compute` is used for [`TaskPool::spawn`]. +/// +/// [`TaskPool::spawn`]: crate::TaskPool::spawn +#[derive(Clone, Copy, Debug)] +pub enum TaskGroup { + /// CPU-bound, short-lived, latency-sensitive tasks. Does not need + /// to yield regularly. Should not hold the thread indefinitely or at the + /// very minimum should not hold a thread longer than the course of a frame. + Compute, + /// IO-bound, potentially long lasting tasks that readily yield any incoming or + /// outbound communication Usually used for loading assets or network communication. + /// + /// If IO threads are sitting idle, they may run `Compute` tasks if the compute threads + /// are at capacity. + IO, + /// CPU-bound, long-lived tasks. Can hold the thread for very long periods (longer than + /// a single frame). + /// + /// If async compute threads are sitting idle, they may run `Compute` or `IO` tasks if the + /// respective threads are at capacity. + AsyncCompute, +} + +impl TaskGroup { + // This is unused on wasm32 platforms. + #[allow(dead_code)] + pub(crate) const MAX_PRIORITY: usize = Self::Compute.to_priority() + 1; + + // This is unused on wasm32 platforms. + pub(crate) const fn to_priority(self) -> usize { + match self { + Self::AsyncCompute => 0, + Self::IO => 1, + Self::Compute => 2, + } + } +} + /// Wraps `async_executor::Task`, a spawned future. /// /// Tasks are also futures themselves and yield the output of the spawned future. @@ -15,11 +55,11 @@ use std::{ /// Wraps `async_executor::Task` #[derive(Debug)] #[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."] -pub struct Task(async_executor::Task); +pub struct Task(async_task::Task); impl Task { /// Creates a new task from a given `async_executor::Task` - pub fn new(task: async_executor::Task) -> Self { + pub fn new(task: async_task::Task) -> Self { Self(task) } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 39308c2bfd7bc..6c8a1e8499c1c 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,3 +1,12 @@ +pub use crate::task_pool_builder::*; +use crate::{ + executor::Executor, local_executor::LocalExecutor, simple_executor::SimpleExecutor, Task, + TaskGroup, +}; +use concurrent_queue::ConcurrentQueue; +use event_listener::Event; +use futures_lite::{future, pin}; +use once_cell::sync::OnceCell; use std::{ future::Future, marker::PhantomData, @@ -7,142 +16,141 @@ use std::{ thread::{self, JoinHandle}, }; -use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, pin, FutureExt}; - -use crate::Task; - -/// Used to create a [`TaskPool`] -#[derive(Debug, Default, Clone)] -#[must_use] -pub struct TaskPoolBuilder { - /// If set, we'll set up the thread pool to use at most `num_threads` threads. - /// Otherwise use the logical core count of the system - num_threads: Option, - /// If set, we'll use the given stack size rather than the system default - stack_size: Option, - /// Allows customizing the name of the threads - helpful for debugging. If set, threads will - /// be named (), i.e. "MyThreadPool (2)" - thread_name: Option, -} - -impl TaskPoolBuilder { - /// Creates a new [`TaskPoolBuilder`] instance - pub fn new() -> Self { - Self::default() - } - - /// Override the number of threads created for the pool. If unset, we default to the number - /// of logical cores of the system - pub fn num_threads(mut self, num_threads: usize) -> Self { - self.num_threads = Some(num_threads); - self - } +static GLOBAL_TASK_POOL: OnceCell = OnceCell::new(); - /// Override the stack size of the threads created for the pool - pub fn stack_size(mut self, stack_size: usize) -> Self { - self.stack_size = Some(stack_size); - self - } - - /// Override the name of the threads created for the pool. If set, threads will - /// be named ` ()`, i.e. `MyThreadPool (2)` - pub fn thread_name(mut self, thread_name: String) -> Self { - self.thread_name = Some(thread_name); - self - } - - /// Creates a new [`TaskPool`] based on the current options. - pub fn build(self) -> TaskPool { - TaskPool::new_internal( - self.num_threads, - self.stack_size, - self.thread_name.as_deref(), - ) - } +#[derive(Debug, Default)] +struct Groups { + compute: usize, + async_compute: usize, + io: usize, } /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. +/// +/// # Scheduling Semantics +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Compute. Compute is higher priority than IO, which are both higher priority than async compute. +/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize +/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would +/// otherwise be sitting idle. +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// async compute thread groups, but any IO task will take precedence over any compute task on the IO +/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. +/// +/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered +/// via [`TaskPoolBuilder`] when constructing the pool. +/// +/// # Drop Behavior +/// Dropping the task pool will immeddiately cancel all scheduled tasks and join all threads contained +/// within. #[derive(Debug)] pub struct TaskPool { - /// The executor for the pool - /// - /// This has to be separate from TaskPoolInner because we have to create an `Arc` to - /// pass into the worker threads, and we must create the worker threads before we can create - /// the `Vec>` contained within `TaskPoolInner` - executor: Arc>, + /// Inner state of the pool + executor: Arc>, /// Inner state of the pool + groups: Groups, threads: Vec>, - shutdown_tx: async_channel::Sender<()>, + shutdown: Arc, } impl TaskPool { thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new(); + pub(crate) static LOCAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new(); } - /// Create a `TaskPool` with the default configuration. - pub fn new() -> Self { - TaskPoolBuilder::new().build() + /// Initializes the global [`TaskPool`] instance. + pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self { + GLOBAL_TASK_POOL.get_or_init(f) } - fn new_internal( - num_threads: Option, - stack_size: Option, - thread_name: Option<&str>, - ) -> Self { - let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - - let executor = Arc::new(async_executor::Executor::new()); - - let num_threads = num_threads.unwrap_or_else(crate::available_parallelism); - - let threads = (0..num_threads) - .map(|i| { - let ex = Arc::clone(&executor); - let shutdown_rx = shutdown_rx.clone(); - - let thread_name = if let Some(thread_name) = thread_name { - format!("{thread_name} ({i})") - } else { - format!("TaskPool ({i})") - }; - let mut thread_builder = thread::Builder::new().name(thread_name); + /// Gets the global [`TaskPool`] instance. + /// + /// # Panics + /// Panics if no pool has been initialized yet. + pub fn get() -> &'static Self { + GLOBAL_TASK_POOL.get().expect( + "A TaskPool has not been initialized yet. Please call \ + TaskPool::init beforehand.", + ) + } - if let Some(stack_size) = stack_size { - thread_builder = thread_builder.stack_size(stack_size); - } + /// Get a [`TaskPoolBuilder`] for custom configuration. + pub fn build() -> TaskPoolBuilder { + TaskPoolBuilder::new() + } - thread_builder - .spawn(move || { - TaskPool::LOCAL_EXECUTOR.with(|local_executor| { - loop { - let res = std::panic::catch_unwind(|| { - let tick_forever = async move { - loop { - local_executor.tick().await; - } - }; - future::block_on(ex.run(tick_forever.or(shutdown_rx.recv()))) - }); - if let Ok(value) = res { - // Use unwrap_err because we expect a Closed error - value.unwrap_err(); - break; - } - } - }); - }) - .expect("Failed to spawn thread.") - }) - .collect(); + pub(crate) fn new_internal(builder: TaskPoolBuilder) -> Self { + let shutdown = Arc::new(Event::new()); + let mut groups = Groups::default(); + let total_threads = crate::available_parallelism() + .clamp(builder.min_total_threads, builder.max_total_threads); + tracing::trace!("Assigning {} cores to default task pools", total_threads); + + let mut remaining_threads = total_threads; + + // Determine the number of IO threads we will use + groups.io = builder + .io + .get_number_of_threads(remaining_threads, total_threads); + + tracing::trace!("IO Threads: {}", groups.io); + remaining_threads = remaining_threads.saturating_sub(groups.io); + + // Determine the number of async compute threads we will use + groups.async_compute = builder + .async_compute + .get_number_of_threads(remaining_threads, total_threads); + + tracing::trace!("Async Compute Threads: {}", groups.async_compute); + remaining_threads = remaining_threads.saturating_sub(groups.async_compute); + + // Determine the number of compute threads we will use + // This is intentionally last so that an end user can specify 1.0 as the percent + groups.compute = builder + .compute + .get_number_of_threads(remaining_threads, total_threads); + tracing::trace!("Compute Threads: {}", groups.compute); + + let mut thread_counts = vec![0; TaskGroup::MAX_PRIORITY]; + thread_counts[TaskGroup::Compute.to_priority()] = groups.compute; + thread_counts[TaskGroup::IO.to_priority()] = groups.io; + thread_counts[TaskGroup::AsyncCompute.to_priority()] = groups.async_compute; + let executor = Arc::new(Executor::new(&thread_counts)); + let thread_groups = [ + (groups.compute, "Compute", TaskGroup::Compute.to_priority()), + (groups.io, "IO", TaskGroup::IO.to_priority()), + ( + groups.async_compute, + "Async Compute", + TaskGroup::AsyncCompute.to_priority(), + ), + ]; + let mut threads = Vec::with_capacity(total_threads); + for (count, name, priority) in thread_groups { + for i in 0..count { + let shutdown = Arc::clone(&shutdown); + let executor = executor.clone(); + let thread = make_thread_builder(&builder, name, i).spawn(move || loop { + let shutdown_listener = shutdown.listen(); + let res = std::panic::catch_unwind(|| { + future::block_on(executor.run(priority, i, shutdown_listener)); + }); + if res.is_ok() { + break; + } + }); + threads.push(thread.expect("Failed to spawn thread.")); + } + } Self { executor, + groups, threads, - shutdown_tx, + shutdown, } } @@ -151,20 +159,31 @@ impl TaskPool { self.threads.len() } - /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, - /// passing a scope object into it. The scope object provided to the callback can be used - /// to spawn tasks. This function will await the completion of all tasks before returning. + /// Return the number of threads that can run a given [`TaskGroup`] in the task pool + pub fn thread_count_for(&self, group: TaskGroup) -> usize { + let groups = &self.groups; + match group { + TaskGroup::Compute => self.thread_num(), + TaskGroup::IO => groups.io + groups.async_compute, + TaskGroup::AsyncCompute => groups.async_compute, + } + } + + /// Allows spawning non-`'static` futures on the thread pool in a specific task group. The + /// function takes a callback, passing a scope object into it. The scope object provided + /// to the callback can be used to spawn tasks. This function will await the completion of + /// all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` /// /// # Example /// /// ``` - /// use bevy_tasks::TaskPool; + /// use bevy_tasks::{TaskPool, TaskGroup}; /// - /// let pool = TaskPool::new(); + /// let pool = TaskPool::init(TaskPool::default); /// let mut x = 0; - /// let results = pool.scope(|s| { + /// let results = pool.scope(TaskGroup::Compute, |s| { /// s.spawn(async { /// // you can borrow the spawner inside a task and spawn tasks from within the task /// s.spawn(async { @@ -184,7 +203,7 @@ impl TaskPool { /// assert!(results.contains(&1)); /// /// // The ordering is deterministic if you only spawn directly from the closure function. - /// let results = pool.scope(|s| { + /// let results = pool.scope(TaskGroup::Compute, |s| { /// s.spawn(async { 0 }); /// s.spawn(async { 1 }); /// }); @@ -205,11 +224,11 @@ impl TaskPool { /// Thus this lifetime must outlive `'scope`. /// /// ```compile_fail - /// use bevy_tasks::TaskPool; + /// use bevy_tasks::{TaskPool, TaskGroup}; /// fn scope_escapes_closure() { - /// let pool = TaskPool::new(); + /// let pool = TaskPool::init(TaskPool::default); /// let foo = Box::new(42); - /// pool.scope(|scope| { + /// pool.scope(TaskGroup::Compute, |scope| { /// std::thread::spawn(move || { /// // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. /// scope.spawn(async move { @@ -221,10 +240,10 @@ impl TaskPool { /// ``` /// /// ```compile_fail - /// use bevy_tasks::TaskPool; + /// use bevy_tasks::{TaskPool, TaskGroup}; /// fn cannot_borrow_from_closure() { - /// let pool = TaskPool::new(); - /// pool.scope(|scope| { + /// let pool = TaskPool::init(TaskPool::default); + /// pool.scope(TaskGroup::Compute, |scope| { /// let x = 1; /// let y = &x; /// scope.spawn(async move { @@ -233,26 +252,32 @@ impl TaskPool { /// }); /// } /// - pub fn scope<'env, F, T>(&self, f: F) -> Vec + pub fn scope<'env, F, T>(&self, group: TaskGroup, f: F) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { + if self.thread_count_for(group) == 0 { + tracing::error!("Attempting to use TaskPool::scope with the {:?} task group, but there are no threads for it!", + group); + } + // SAFETY: This safety comment applies to all references transmuted to 'env. // Any futures spawned with these references need to return before this function completes. // This is guaranteed because we drive all the futures spawned onto the Scope // to completion in this function. However, rust has no way of knowing this so we // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. - let executor: &async_executor::Executor = &self.executor; - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; - let task_scope_executor = &async_executor::Executor::default(); - let task_scope_executor: &'env async_executor::Executor = + let executor: &Executor = &self.executor; + let executor: &'env Executor = unsafe { mem::transmute(executor) }; + let task_scope_executor = &SimpleExecutor::new(); + let task_scope_executor: &'env SimpleExecutor = unsafe { mem::transmute(task_scope_executor) }; - let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); - let spawned_ref: &'env ConcurrentQueue> = + let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); + let spawned_ref: &'env ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; let scope = Scope { + group, executor, task_scope_executor, spawned: spawned_ref, @@ -292,29 +317,38 @@ impl TaskPool { // complete. (If the caller of scope() happens to be a thread in // this thread pool, and we only have one thread in the pool, then // simply calling future::block_on(spawned) would deadlock.) - let mut spawned = task_scope_executor.spawn(get_results); + let mut spawned = executor.spawn(group.to_priority(), get_results); loop { if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { break result; }; - self.executor.try_tick(); + self.executor.try_tick(group.to_priority()); task_scope_executor.try_tick(); } } } - /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be - /// cancelled and "detached" allowing it to continue running without having to be polled by the - /// end-user. + /// Spawns a static future onto the thread pool in a group. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + #[inline] + pub fn spawn( + &self, + group: TaskGroup, + future: impl Future + Send + 'static, + ) -> Task where T: Send + 'static, { - Task::new(self.executor.spawn(future)) + if self.thread_count_for(group) == 0 { + tracing::error!("Attempted to use TaskPool::spawn with the {:?} task group, but there are no threads for it!", + group); + } + Task::new(self.executor.spawn(group.to_priority(), future)) } /// Spawns a static future on the thread-local async executor for the current thread. The task @@ -329,34 +363,23 @@ impl TaskPool { Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) } - /// Runs a function with the local executor. Typically used to tick - /// the local executor on the main thread as it needs to share time with - /// other things. - /// - /// ```rust - /// use bevy_tasks::TaskPool; - /// - /// TaskPool::new().with_local_executor(|local_executor| { - /// local_executor.try_tick(); - /// }); - /// ``` - pub fn with_local_executor(&self, f: F) -> R - where - F: FnOnce(&async_executor::LocalExecutor) -> R, - { - Self::LOCAL_EXECUTOR.with(f) + /// Flushes all local tasks on the current thread from the `TaskPool`. + /// This function will continue running until the local executor for the + /// current thread is empty. + pub fn flush_local_tasks() { + Self::LOCAL_EXECUTOR.with(|local_executor| while local_executor.try_tick() {}); } } impl Default for TaskPool { fn default() -> Self { - Self::new() + TaskPoolBuilder::new().build() } } impl Drop for TaskPool { fn drop(&mut self) { - self.shutdown_tx.close(); + self.shutdown.notify_additional_relaxed(usize::MAX); let panicking = thread::panicking(); for join_handle in self.threads.drain(..) { @@ -373,9 +396,10 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::Executor<'scope>, - task_scope_executor: &'scope async_executor::Executor<'scope>, - spawned: &'scope ConcurrentQueue>, + group: TaskGroup, + executor: &'scope Executor<'scope>, + task_scope_executor: &'scope SimpleExecutor<'scope>, + spawned: &'scope ConcurrentQueue>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, @@ -391,7 +415,7 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&self, f: Fut) { - let task = self.executor.spawn(f); + let task = self.executor.spawn(self.group.to_priority(), f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap self.spawned.push(task).unwrap(); @@ -411,10 +435,32 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { } } +fn make_thread_builder( + builder: &TaskPoolBuilder, + prefix: &'static str, + idx: usize, +) -> thread::Builder { + let mut thread_builder = { + let thread_name = if let Some(ref thread_name) = builder.thread_name { + format!("{} ({}, {})", thread_name, prefix, idx) + } else { + format!("TaskPool ({}, {})", prefix, idx) + }; + thread::Builder::new().name(thread_name) + }; + + if let Some(stack_size) = builder.stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + thread_builder +} + #[cfg(test)] #[allow(clippy::disallowed_types)] mod tests { use super::*; + use crate::TaskGroup; use std::sync::{ atomic::{AtomicBool, AtomicI32, Ordering}, Barrier, @@ -422,14 +468,14 @@ mod tests { #[test] fn test_spawn() { - let pool = TaskPool::new(); + let pool = TaskPool::init(TaskPool::default); let foo = Box::new(42); let foo = &*foo; let count = Arc::new(AtomicI32::new(0)); - let outputs = pool.scope(|scope| { + let outputs = pool.scope(TaskGroup::Compute, |scope| { for _ in 0..100 { let count_clone = count.clone(); scope.spawn(async move { @@ -452,8 +498,8 @@ mod tests { } #[test] - fn test_mixed_spawn_on_scope_and_spawn() { - let pool = TaskPool::new(); + fn test_mixed_spawn_local_and_spawn() { + let pool = TaskPool::init(TaskPool::default); let foo = Box::new(42); let foo = &*foo; @@ -461,7 +507,7 @@ mod tests { let local_count = Arc::new(AtomicI32::new(0)); let non_local_count = Arc::new(AtomicI32::new(0)); - let outputs = pool.scope(|scope| { + let outputs = pool.scope(TaskGroup::Compute, |scope| { for i in 0..100 { if i % 2 == 0 { let count_clone = non_local_count.clone(); @@ -498,7 +544,7 @@ mod tests { #[test] fn test_thread_locality() { - let pool = Arc::new(TaskPool::new()); + let pool = TaskPool::init(TaskPool::default); let count = Arc::new(AtomicI32::new(0)); let barrier = Arc::new(Barrier::new(101)); let thread_check_failed = Arc::new(AtomicBool::new(false)); @@ -506,10 +552,9 @@ mod tests { for _ in 0..100 { let inner_barrier = barrier.clone(); let count_clone = count.clone(); - let inner_pool = pool.clone(); let inner_thread_check_failed = thread_check_failed.clone(); std::thread::spawn(move || { - inner_pool.scope(|scope| { + pool.scope(TaskGroup::Compute, |scope| { let inner_count_clone = count_clone.clone(); scope.spawn(async move { inner_count_clone.fetch_add(1, Ordering::Release); @@ -535,14 +580,14 @@ mod tests { #[test] fn test_nested_spawn() { - let pool = TaskPool::new(); + let pool = TaskPool::init(TaskPool::default); let foo = Box::new(42); let foo = &*foo; let count = Arc::new(AtomicI32::new(0)); - let outputs: Vec = pool.scope(|scope| { + let outputs: Vec = pool.scope(TaskGroup::Compute, |scope| { for _ in 0..10 { let count_clone = count.clone(); scope.spawn(async move { @@ -573,7 +618,7 @@ mod tests { #[test] fn test_nested_locality() { - let pool = Arc::new(TaskPool::new()); + let pool = TaskPool::init(TaskPool::default); let count = Arc::new(AtomicI32::new(0)); let barrier = Arc::new(Barrier::new(101)); let thread_check_failed = Arc::new(AtomicBool::new(false)); @@ -581,10 +626,9 @@ mod tests { for _ in 0..100 { let inner_barrier = barrier.clone(); let count_clone = count.clone(); - let inner_pool = pool.clone(); let inner_thread_check_failed = thread_check_failed.clone(); std::thread::spawn(move || { - inner_pool.scope(|scope| { + pool.scope(TaskGroup::Compute, |scope| { let spawner = std::thread::current().id(); let inner_count_clone = count_clone.clone(); scope.spawn(async move { diff --git a/crates/bevy_tasks/src/task_pool_builder.rs b/crates/bevy_tasks/src/task_pool_builder.rs new file mode 100644 index 0000000000000..7315fb0e87d3f --- /dev/null +++ b/crates/bevy_tasks/src/task_pool_builder.rs @@ -0,0 +1,183 @@ +use crate::TaskPool; + +/// Defines a simple way to determine how many threads to use given the number of remaining cores +/// and number of total cores +#[derive(Debug, Clone)] +pub struct TaskGroupBuilder { + /// Force using at least this many threads + pub(crate) min_threads: usize, + /// Under no circumstance use more than this many threads for this pool + pub(crate) max_threads: usize, + /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is + /// permitted to use 1.0 to try to use all remaining threads + pub(crate) percent: f32, +} + +impl TaskGroupBuilder { + /// Force using exactly this many threads + pub fn threads(&mut self, thread_count: usize) -> &mut Self { + self.min_threads(thread_count).max_threads(thread_count) + } + + /// Force using at least this many threads + pub fn min_threads(&mut self, thread_count: usize) -> &mut Self { + self.min_threads = thread_count; + self + } + + /// Under no circumstance use more than this many threads for this pool + pub fn max_threads(&mut self, thread_count: usize) -> &mut Self { + self.max_threads = thread_count; + self + } + + /// Target using this percentage of total cores in the range `[0.0, 1.0]`, clamped by + /// `min_threads` and `max_threads`. Use 1.0 to try to use all remaining threads. + pub fn percent(&mut self, percent: f32) -> &mut Self { + self.percent = percent; + self + } + + /// Determine the number of threads to use for this task pool + #[allow(dead_code)] // This is unused on wasm32 platforms + pub(crate) fn get_number_of_threads( + &self, + remaining_threads: usize, + total_threads: usize, + ) -> usize { + assert!(self.percent >= 0.0); + let mut desired = (total_threads as f32 * self.percent).round() as usize; + + // Limit ourselves to the number of cores available + desired = desired.min(remaining_threads); + + // Clamp by min_threads, max_threads. (This may result in us using more threads than are + // available, this is intended. An example case where this might happen is a device with + // <= 2 threads. + desired.clamp(self.min_threads, self.max_threads) + } +} + +/// Used to create a [`TaskPool`] +#[derive(Debug, Clone)] +#[must_use] +pub struct TaskPoolBuilder { + /// If the number of physical cores is less than min_total_threads, force using + /// min_total_threads + pub(crate) min_total_threads: usize, + /// If the number of physical cores is grater than max_total_threads, force using + /// max_total_threads + pub(crate) max_total_threads: usize, + + /// Used to determine number of IO threads to allocate + pub(crate) io: TaskGroupBuilder, + /// Used to determine number of async compute threads to allocate + pub(crate) async_compute: TaskGroupBuilder, + /// Used to determine number of compute threads to allocate + pub(crate) compute: TaskGroupBuilder, + /// If set, we'll use the given stack size rather than the system default + pub(crate) stack_size: Option, + /// Allows customizing the name of the threads - helpful for debugging. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + pub(crate) thread_name: Option, +} + +impl Default for TaskPoolBuilder { + fn default() -> Self { + Self { + // By default, use however many cores are available on the system + min_total_threads: 1, + max_total_threads: std::usize::MAX, + + stack_size: None, + thread_name: None, + + // Use 25% of cores for IO, at least 1, no more than 4 + io: TaskGroupBuilder { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use 25% of cores for async compute, at least 1, no more than 4 + async_compute: TaskGroupBuilder { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use all remaining cores for compute (at least 1) + compute: TaskGroupBuilder { + min_threads: 1, + max_threads: std::usize::MAX, + percent: 1.0, // This 1.0 here means "whatever is left over" + }, + } + } +} + +impl TaskPoolBuilder { + /// Creates a new [`TaskPoolBuilder`] instance + pub fn new() -> Self { + Self::default() + } + + /// Force using exactly this many threads + pub fn threads(self, thread_count: usize) -> Self { + self.min_threads(thread_count).max_threads(thread_count) + } + + /// Force using at least this many threads + pub fn min_threads(mut self, thread_count: usize) -> Self { + self.min_total_threads = thread_count; + self + } + + /// Under no circumstance use more than this many threads for this pool + pub fn max_threads(mut self, thread_count: usize) -> Self { + self.max_total_threads = thread_count; + self + } + + /// Configure the group options for [`TaskGroup::Compute`]. + /// + /// [`TaskGroup::Compute`]: crate::TaskGroup::Compute + pub fn compute(mut self, builder: F) -> Self { + builder(&mut self.compute); + self + } + + /// Configure the group options for [`TaskGroup::AsyncCompute`]. + /// + /// [`TaskGroup::AsyncCompute`]: crate::TaskGroup::AsyncCompute + pub fn async_compute(mut self, builder: F) -> Self { + builder(&mut self.async_compute); + self + } + + /// Configure the group options for [`TaskGroup::IO`]. + /// + /// [`TaskGroup::IO`]: crate::TaskGroup::IO + pub fn io(mut self, builder: F) -> Self { + builder(&mut self.io); + self + } + + /// Override the name of the threads created for the pool. If set, threads will + /// be named ` (, )`, i.e. `MyThreadPool (IO, 2)` + pub fn thread_name(mut self, name: impl Into) -> Self { + self.thread_name = Some(name.into()); + self + } + + /// Override the stack size of the threads created for the pool + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// Creates a new [`TaskPool`] based on the current options. + pub fn build(self) -> TaskPool { + TaskPool::new_internal(self) + } +} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs deleted file mode 100644 index 1d0c83b271c2f..0000000000000 --- a/crates/bevy_tasks/src/usages.rs +++ /dev/null @@ -1,137 +0,0 @@ -//! Definitions for a few common task pools that we want. Generally the determining factor for what -//! kind of work should go in each pool is latency requirements. -//! -//! For CPU-intensive work (tasks that generally spin until completion) we have a standard -//! [`ComputeTaskPool`] and an [`AsyncComputeTaskPool`]. Work that does not need to be completed to -//! present the next frame should go to the [`AsyncComputeTaskPool`] -//! -//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO -//! task pool. The tasks here are expected to complete very quickly. Generally they should just -//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready -//! for consumption. (likely via channels) - -use super::TaskPool; -use once_cell::sync::OnceCell; -use std::ops::Deref; - -static COMPUTE_TASK_POOL: OnceCell = OnceCell::new(); -static ASYNC_COMPUTE_TASK_POOL: OnceCell = OnceCell::new(); -static IO_TASK_POOL: OnceCell = OnceCell::new(); - -/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next -/// frame -#[derive(Debug)] -pub struct ComputeTaskPool(TaskPool); - -impl ComputeTaskPool { - /// Initializes the global [`ComputeTaskPool`] instance. - pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self { - COMPUTE_TASK_POOL.get_or_init(|| Self(f())) - } - - /// Gets the global [`ComputeTaskPool`] instance. - /// - /// # Panics - /// Panics if no pool has been initialized yet. - pub fn get() -> &'static Self { - COMPUTE_TASK_POOL.get().expect( - "A ComputeTaskPool has not been initialized yet. Please call \ - ComputeTaskPool::init beforehand.", - ) - } -} - -impl Deref for ComputeTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// A newtype for a task pool for CPU-intensive work that may span across multiple frames -#[derive(Debug)] -pub struct AsyncComputeTaskPool(TaskPool); - -impl AsyncComputeTaskPool { - /// Initializes the global [`AsyncComputeTaskPool`] instance. - pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self { - ASYNC_COMPUTE_TASK_POOL.get_or_init(|| Self(f())) - } - - /// Gets the global [`AsyncComputeTaskPool`] instance. - /// - /// # Panics - /// Panics if no pool has been initialized yet. - pub fn get() -> &'static Self { - ASYNC_COMPUTE_TASK_POOL.get().expect( - "A AsyncComputeTaskPool has not been initialized yet. Please call \ - AsyncComputeTaskPool::init beforehand.", - ) - } -} - -impl Deref for AsyncComputeTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a -/// "woken" state) -#[derive(Debug)] -pub struct IoTaskPool(TaskPool); - -impl IoTaskPool { - /// Initializes the global [`IoTaskPool`] instance. - pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self { - IO_TASK_POOL.get_or_init(|| Self(f())) - } - - /// Gets the global [`IoTaskPool`] instance. - /// - /// # Panics - /// Panics if no pool has been initialized yet. - pub fn get() -> &'static Self { - IO_TASK_POOL.get().expect( - "A IoTaskPool has not been initialized yet. Please call \ - IoTaskPool::init beforehand.", - ) - } -} - -impl Deref for IoTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Used by `bevy_core` to tick the global tasks pools on the main thread. -/// This will run a maximum of 100 local tasks per executor per call to this function. -#[cfg(not(target_arch = "wasm32"))] -pub fn tick_global_task_pools_on_main_thread() { - COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); - }); -} diff --git a/examples/app/thread_pool_resources.rs b/examples/app/thread_pool_resources.rs index 3c76e98dfb2bb..31aa980764103 100644 --- a/examples/app/thread_pool_resources.rs +++ b/examples/app/thread_pool_resources.rs @@ -2,11 +2,12 @@ //! certain number of threads). use bevy::prelude::*; +use bevy::tasks::TaskPoolBuilder; fn main() { App::new() .add_plugins(DefaultPlugins.set(CorePlugin { - task_pool_options: TaskPoolOptions::with_num_threads(4), + task_pool_builder: TaskPoolBuilder::new().threads(4), })) .run(); } diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index d0f4466d76eaf..5c9c856ee66c6 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -3,7 +3,7 @@ use bevy::{ prelude::*, - tasks::{AsyncComputeTaskPool, Task}, + tasks::{Task, TaskGroup, TaskPool}, }; use futures_lite::future; use rand::Rng; @@ -52,12 +52,12 @@ struct ComputeTransform(Task); /// system, `handle_tasks`, will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes fn spawn_tasks(mut commands: Commands) { - let thread_pool = AsyncComputeTaskPool::get(); + let thread_pool = TaskPool::get(); for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { - // Spawn new task on the AsyncComputeTaskPool - let task = thread_pool.spawn(async move { + // Spawn new task on the TaskPool + let task = thread_pool.spawn(TaskGroup::AsyncCompute, async move { let mut rng = rand::thread_rng(); let start_time = Instant::now(); let duration = Duration::from_secs_f32(rng.gen_range(0.05..0.2)); diff --git a/examples/scene/scene.rs b/examples/scene/scene.rs index a3210214ad7b9..be56c14711b16 100644 --- a/examples/scene/scene.rs +++ b/examples/scene/scene.rs @@ -2,7 +2,11 @@ use std::fs::File; use std::io::Write; -use bevy::{prelude::*, tasks::IoTaskPool, utils::Duration}; +use bevy::{ + prelude::*, + tasks::{TaskGroup, TaskPool}, + utils::Duration, +}; fn main() { App::new() @@ -112,8 +116,8 @@ fn save_scene_system(world: &mut World) { // as they are blocking // This can't work in WASM as there is no filesystem access #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() - .spawn(async move { + TaskPool::get() + .spawn(TaskGroup::IO, async move { // Write the scene RON data to file File::create(format!("assets/{NEW_SCENE_FILE_PATH}")) .and_then(|mut file| file.write(serialized_scene.as_bytes()))