Skip to content

Commit

Permalink
Task System for Bevy (bevyengine#384)
Browse files Browse the repository at this point in the history
Add bevy_tasks crate to replace rayon
  • Loading branch information
lachlansneff authored and mrk-its committed Oct 6, 2020
1 parent 9f7de20 commit ada462c
Show file tree
Hide file tree
Showing 22 changed files with 847 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" }
bevy_ui = { path = "crates/bevy_ui", version = "0.1" }
bevy_utils = { path = "crates/bevy_utils", version = "0.1" }
bevy_window = { path = "crates/bevy_window", version = "0.1" }
bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" }

# bevy (optional)
bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" }
Expand Down
4 changes: 3 additions & 1 deletion crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ keywords = ["bevy"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_math = { path = "../bevy_math", version = "0.1" }

# other
libloading = "0.6"
log = { version = "0.4", features = ["release_max_level_info"] }
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"]}
8 changes: 7 additions & 1 deletion crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::app_builder::AppBuilder;
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};

#[allow(clippy::needless_doctest_main)]
Expand Down Expand Up @@ -63,6 +63,12 @@ impl App {
}

pub fn run(mut self) {
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);

self.startup_schedule.initialize(&mut self.resources);
self.startup_executor.run(
&mut self.startup_schedule,
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;

pub use app::*;
pub use app_builder::*;
pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;

pub mod prelude {
pub use crate::{
Expand Down
147 changes: 147 additions & 0 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
/// Under no circumstance use more than this many threads for this pool
pub max_threads: usize,
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
}

impl TaskPoolThreadAssignmentPolicy {
/// Determine the number of threads to use for this task pool
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
assert!(self.percent >= 0.0);
let mut desired = (total_threads as f32 * self.percent).round() as usize;

// Limit ourselves to the number of cores available
desired = desired.min(remaining_threads);

// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
bevy_math::clamp(desired, self.min_threads, self.max_threads)
}
}

/// Helper for configuring and creating the default task pools. For end-users who want full control,
/// insert the default task pools into the resource map manually. If the pools are already inserted,
/// this helper will do nothing.
#[derive(Clone)]
pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
pub min_total_threads: usize,
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
pub max_total_threads: usize,

/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
}

impl Default for DefaultTaskPoolOptions {
fn default() -> Self {
DefaultTaskPoolOptions {
// By default, use however many cores are available on the system
min_total_threads: 1,
max_total_threads: std::usize::MAX,

// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
}
}
}

impl DefaultTaskPoolOptions {
/// Create a configuration that forces using the given number of threads.
pub fn with_num_threads(thread_count: usize) -> Self {
let mut options = Self::default();
options.min_total_threads = thread_count;
options.max_total_threads = thread_count;

options
}

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = bevy_math::clamp(
bevy_tasks::logical_core_count(),
self.min_total_threads,
self.max_total_threads,
);

let mut remaining_threads = total_threads;

if !resources.contains::<IOTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= io_threads;

resources.insert(IOTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}

if !resources.contains::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= async_compute_threads;

resources.insert(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}

if !resources.contains::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

resources.insert(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
}
}
}
2 changes: 1 addition & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ profiler = []

[dependencies]
bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" }
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
bevy_utils = { path = "../bevy_utils", version = "0.1" }
rand = "0.7.2"
rayon = "1.3"
crossbeam-channel = "0.4.2"
fixedbitset = "0.3.0"
downcast-rs = "1.1.1"
Expand Down
6 changes: 6 additions & 0 deletions crates/bevy_ecs/src/resource/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}

/// Returns a clone of the underlying resource, this is helpful when borrowing something
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
self.get::<T>().map(|r| (*r).clone())
}

#[allow(clippy::needless_lifetimes)]
pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option<Ref<'a, T>> {
self.get_resource(ResourceIndex::System(id))
Expand Down
70 changes: 18 additions & 52 deletions crates/bevy_ecs/src/schedule/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World};
use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use rayon::ScopeFifo;
use std::{ops::Range, sync::Arc};

/// Executes each schedule stage in parallel by analyzing system dependencies.
Expand Down Expand Up @@ -66,52 +65,6 @@ impl ParallelExecutor {
}
}

/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs.
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
#[derive(Debug, Default, Clone)]
pub struct ParallelExecutorOptions {
/// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`.
num_threads: Option<usize>,
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`.
stack_size: Option<usize>,
// TODO: Do we also need/want to expose other features (*_handler, etc.)
}

impl ParallelExecutorOptions {
/// Creates a new ParallelExecutorOptions instance
pub fn new() -> Self {
Self::default()
}

/// Sets the num_threads option, using the builder pattern
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
/// otherwise your application may run into stability and performance issues.
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
self.stack_size = stack_size;
self
}

/// Creates a new ThreadPoolBuilder based on the current options.
pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder {
let mut builder = rayon::ThreadPoolBuilder::new();

if let Some(num_threads) = self.num_threads {
builder = builder.num_threads(num_threads);
}

if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}

builder
}
}

#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
Expand Down Expand Up @@ -262,7 +215,7 @@ impl ExecutorStage {
&mut self,
systems: &[Arc<Mutex<Box<dyn System>>>],
run_ready_type: RunReadyType,
scope: &ScopeFifo<'run>,
scope: &mut bevy_tasks::Scope<'run, ()>,
world: &'run World,
resources: &'run Resources,
) -> RunReadyResult {
Expand Down Expand Up @@ -308,7 +261,8 @@ impl ExecutorStage {
// handle multi-threaded system
let sender = self.sender.clone();
self.running_systems.insert(system_index);
scope.spawn_fifo(move |_| {

scope.spawn(async move {
let mut system = system.lock();
system.run(world, resources);
sender.send(system_index).unwrap();
Expand All @@ -328,6 +282,10 @@ impl ExecutorStage {
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let compute_pool = resources
.get_cloned::<bevy_tasks::ComputeTaskPool>()
.unwrap();

// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
self.system_dependencies.clear();
Expand Down Expand Up @@ -364,7 +322,8 @@ impl ExecutorStage {
// if there are no upcoming thread local systems, run everything right now
0..systems.len()
};
rayon::scope_fifo(|scope| {

compute_pool.scope(|scope| {
run_ready_result = self.run_ready_systems(
systems,
RunReadyType::Range(run_ready_system_index_range),
Expand All @@ -373,6 +332,7 @@ impl ExecutorStage {
resources,
);
});

loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
Expand All @@ -393,7 +353,7 @@ impl ExecutorStage {
run_ready_result = RunReadyResult::Ok;
} else {
// wait for a system to finish, then run its dependents
rayon::scope_fifo(|scope| {
compute_pool.scope(|scope| {
loop {
// if all systems in the stage are finished, break out of the loop
if self.finished_systems.count_ones(..) == systems.len() {
Expand All @@ -410,7 +370,7 @@ impl ExecutorStage {
resources,
);

// if the next ready system is thread local, break out of this loop/rayon scope so it can be run
// if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
break;
}
Expand Down Expand Up @@ -442,6 +402,7 @@ mod tests {
Commands,
};
use bevy_hecs::{Entity, World};
use bevy_tasks::{ComputeTaskPool, TaskPool};
use fixedbitset::FixedBitSet;
use parking_lot::Mutex;
use std::sync::Arc;
Expand All @@ -455,6 +416,8 @@ mod tests {
fn cross_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));

let mut schedule = Schedule::default();
schedule.add_stage("PreArchetypeChange");
schedule.add_stage("PostArchetypeChange");
Expand Down Expand Up @@ -484,6 +447,8 @@ mod tests {
fn intra_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));

let mut schedule = Schedule::default();
schedule.add_stage("update");

Expand Down Expand Up @@ -512,6 +477,7 @@ mod tests {
fn schedule() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(ComputeTaskPool(TaskPool::default()));
resources.insert(Counter::default());
resources.insert(1.0f64);
resources.insert(2isize);
Expand Down
Loading

0 comments on commit ada462c

Please sign in to comment.