Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internalize task distinction into TaskPool #4740

Closed
wants to merge 108 commits into from
Closed
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
aa75703
Internalize task distinction into TaskPool
james7132 May 13, 2022
3f97865
Basic priortization strategy
james7132 May 13, 2022
f0a70b9
Add missing docs
james7132 May 13, 2022
dadfe15
Formatting
james7132 May 13, 2022
3f260b2
Fix WASM
james7132 May 13, 2022
e35e11a
Fix local tests for bevy_task
james7132 May 13, 2022
2eab8fc
Fix CI
james7132 May 13, 2022
02a36b5
Fix wasm32
james7132 May 13, 2022
76a2da0
Fix miri
james7132 May 13, 2022
2a0ae14
More wasm32 tomfoolery
james7132 May 13, 2022
c32bea7
Derp
james7132 May 13, 2022
7911617
Formatting woes
james7132 May 13, 2022
c3e4cff
Fix up TaskPoolBuilder for wasm32
james7132 May 13, 2022
0b67525
Remove unnecessary Send bounds
james7132 May 13, 2022
e87790e
More wasm woes
james7132 May 13, 2022
8c722e3
Formatting
james7132 May 13, 2022
6ad621e
Use run instead custom run_forever
james7132 May 14, 2022
92e8bdf
Remove redundant "or" call
james7132 May 15, 2022
36dd177
Use TaskGroup to specify target group instead
james7132 May 15, 2022
d8f9016
Fix WASM
james7132 May 15, 2022
5d9f71d
Relax WASM bounds
james7132 May 15, 2022
ce9c9c7
Remove more extra shutdown checks
james7132 May 15, 2022
67b3b5a
Clean up thread builder code
james7132 May 15, 2022
e160839
Try removing miri cfg
james7132 May 15, 2022
9b93347
Review fixes for single_threaded_task_pool
james7132 May 16, 2022
291c09b
Add error logs if running without any threads in a given group
james7132 May 16, 2022
5123487
Merge DefaultTaskPoolOptions in.
james7132 May 16, 2022
4ab2bf6
Fix CI
james7132 May 16, 2022
476a190
Update comment about priority
james7132 May 16, 2022
f402434
Add missing doc comment
james7132 May 16, 2022
3454585
More complete builder syntax for building TaskPool
james7132 May 16, 2022
d33eb68
Merge branch 'main' into task-pool-internalization
james7132 May 16, 2022
86d1b24
Review fixes
james7132 May 18, 2022
2f65f20
Remove clone
james7132 May 18, 2022
a202d91
tick instead of try_tick
hymm May 18, 2022
fedb3a4
swap shutdown rx with ticker
hymm May 18, 2022
b6aa476
Merge pull request #2 from hymm/pool-intern-experiment
james7132 May 19, 2022
7d0235a
Add the early-out back in
james7132 May 19, 2022
9d1e974
Improve docs on TaskGroup, remove shortcuts
james7132 May 19, 2022
c4cba7f
Formatting
james7132 May 19, 2022
deff027
Fix typo.
james7132 May 19, 2022
94f6210
Cleanup code
james7132 May 19, 2022
fe56295
Fix WASM
james7132 May 19, 2022
5af3e49
Fix docs
james7132 May 19, 2022
1ba48c6
Fix WASM
james7132 May 19, 2022
ace8227
YAWF: Yet another WASM fix
james7132 May 19, 2022
8d3569a
Review comments
james7132 May 25, 2022
f077cfb
Merge branch 'main' into task-pool-internalization
james7132 May 26, 2022
3efc5a6
Merge branch 'main' into task-pool-internalization
james7132 Jun 1, 2022
b84fbba
Revert scope executor changes
james7132 Jun 1, 2022
e26fc87
Review comments
james7132 Jun 1, 2022
5b47ffb
Update docs
james7132 Jun 1, 2022
b5fc0d2
Embed async_executor into bevy_tasks
james7132 Jun 6, 2022
d5cb31c
Remove once_cell in the executor
james7132 Jun 7, 2022
610140a
Use parking_lot over std locks
james7132 Jun 7, 2022
1a7ce57
Merge branch 'embed-async-executor' into task-pool-internalization
james7132 Jun 7, 2022
41067b2
Internalize task prioritization into executor
james7132 Jun 7, 2022
cf0de23
Properly prioritize tasks in the executor
james7132 Jun 7, 2022
f2fda9b
Fix CI
james7132 Jun 7, 2022
247b9cb
Fix WASM
james7132 Jun 7, 2022
fa997d9
Remove the unnecessary RwLock: we know how many runners we need
james7132 Jun 7, 2022
528493e
A bit of code cleanup
james7132 Jun 7, 2022
0e55d7e
Formatting
james7132 Jun 7, 2022
23e7372
Small cleanup, avoid overhead of Iterator::chain
james7132 Jun 8, 2022
981d4ae
Split out sleepers to properly notify them
james7132 Jun 8, 2022
7fb8d25
Replace local queue Arcs with lifetimed references
james7132 Jun 8, 2022
a67b7ff
Remove redundant State reference from Runner
james7132 Jun 8, 2022
96fdaf2
Remove the thread local lookup by embedding the RNG into the Runner
james7132 Jun 8, 2022
ad2bae0
Attempt to directly write to a local queue before going to the global…
james7132 Jun 8, 2022
6a4c0aa
Avoid assert in hot loop while stealing
james7132 Jun 8, 2022
8bf519c
Merge branch 'main' into task-pool-internalization
james7132 Jun 9, 2022
cb4b57b
Merge branch 'main' into task-pool-internalization
james7132 Jun 9, 2022
637fdc8
Drop unused event-listener dependency
james7132 Jun 9, 2022
7deacd3
Revert directly writing to local queues
james7132 Jun 9, 2022
0fa8eb1
Small code cleanup
james7132 Jun 9, 2022
9bfa2d0
Mix local and global queue fetches by priority
james7132 Jun 10, 2022
a3bed82
Remove active slab
james7132 Jun 10, 2022
c3f014f
Formatting
james7132 Jun 10, 2022
018adfd
Remove unnecessary indexing when ticking
james7132 Jun 10, 2022
189c53d
Cleanup: Reduce LocalExecutor to a single queue
james7132 Jun 12, 2022
560ff25
Remove the unsafe unwrap_unchecked without a good perf reason
james7132 Jun 12, 2022
1ada122
Remove atomics from Runner
james7132 Jun 14, 2022
4c1f26f
Merge branch 'main' into task-pool-internalization
james7132 Jun 14, 2022
67c866b
Merge branch 'less-atomics' into task-pool-internalization
james7132 Jun 14, 2022
5a2de23
Fix certain thread groups not being used. Add throttling.
james7132 Jun 14, 2022
38d9936
Use st3 for local queues and work-stealing
james7132 Jun 15, 2022
bc469af
Only notify when stealing after a search
james7132 Jun 15, 2022
bba24f1
Refactor search throttling
james7132 Jun 16, 2022
a0f139e
Merge branch 'main' into task-pool-internalization
james7132 Jun 16, 2022
b34b5f6
Formatting
james7132 Jun 16, 2022
93134c8
Shut up clippy
james7132 Jun 16, 2022
2178238
Limit lower priority stealing and notifications
james7132 Jun 17, 2022
71822ee
Replace the async_channel dependency with a lower level event_listener
james7132 Jun 17, 2022
5f99996
Split out LocalExecutor into it's own file. Shrink dependencies on wa…
james7132 Jun 17, 2022
acf56cf
Merge branch 'main' into task-pool-internalization
james7132 Jun 21, 2022
558db08
Formatting
james7132 Jun 21, 2022
310c3b9
Merge branch 'main' into task-pool-internalization
james7132 Nov 5, 2022
0ca76c9
Attempt to fix CI
james7132 Nov 5, 2022
1378672
Update st3
james7132 Nov 5, 2022
994a250
Use let-else
james7132 Nov 5, 2022
18d2a8f
Fix locality tests
james7132 Nov 5, 2022
9cb309b
Fix CI
james7132 Nov 5, 2022
b4f700c
Fix WASM
james7132 Nov 5, 2022
30aad1e
Make LocalExecutor not require atomics
james7132 Nov 5, 2022
2b0d2d7
Add simple executor to avoid using Executor in scope
james7132 Nov 5, 2022
c380b57
Clean up thread spawn logic
james7132 Nov 15, 2022
9be6fe2
Merge branch 'main' into task-pool-internalization
james7132 Nov 15, 2022
07e1acf
Formatting
james7132 Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bevy_ecs::prelude::*;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use bevy_tasks::TaskPool;
use glam::*;

#[derive(Component, Copy, Clone)]
Expand All @@ -18,7 +18,7 @@ pub struct Benchmark(World, Box<dyn System<In = (), Out = ()>>);

impl Benchmark {
pub fn new() -> Self {
ComputeTaskPool::init(TaskPool::default);
TaskPool::init(TaskPool::default);

let mut world = World::default();

Expand Down
8 changes: 4 additions & 4 deletions benches/benches/bevy_tasks/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bevy_tasks::{ParallelIterator, TaskPoolBuilder};
use bevy_tasks::{ParallelIterator, TaskPool};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};

struct ParChunks<'a, T>(std::slice::Chunks<'a, T>);
Expand Down Expand Up @@ -34,7 +34,7 @@ fn bench_overhead(c: &mut Criterion) {
let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("overhead_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPool::build().threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down Expand Up @@ -69,7 +69,7 @@ fn bench_for_each(c: &mut Criterion) {
let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("for_each_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPool::build().threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down Expand Up @@ -115,7 +115,7 @@ fn bench_many_maps(c: &mut Criterion) {
let v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("many_maps_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPool::build().threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use anyhow::Result;
use bevy_ecs::system::{Res, ResMut};
use bevy_log::warn;
use bevy_tasks::IoTaskPool;
use bevy_tasks::{TaskGroup, TaskPool};
use bevy_utils::{Entry, HashMap, Uuid};
use crossbeam_channel::TryRecvError;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -374,8 +374,8 @@ impl AssetServer {
pub(crate) fn load_untracked(&self, asset_path: AssetPath<'_>, force: bool) -> HandleId {
let server = self.clone();
let owned_path = asset_path.to_owned();
IoTaskPool::get()
.spawn(async move {
TaskPool::get()
.spawn(TaskGroup::IO, async move {
if let Err(err) = server.load_async(owned_path, force).await {
warn!("{}", err);
}
Expand Down Expand Up @@ -616,7 +616,7 @@ mod test {

fn setup(asset_path: impl AsRef<Path>) -> AssetServer {
use crate::FileAssetIo;
IoTaskPool::init(Default::default);
TaskPool::init(Default::default);
AssetServer::new(FileAssetIo::new(asset_path, false))
}

Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/debug_asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bevy_ecs::{
schedule::SystemLabel,
system::{NonSendMut, Res, ResMut, SystemState},
};
use bevy_tasks::{IoTaskPool, TaskPoolBuilder};
use bevy_tasks::TaskPool;
use bevy_utils::HashMap;
use std::{
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -58,9 +58,9 @@ impl<T: Asset> Default for HandleMap<T> {

impl Plugin for DebugAssetServerPlugin {
fn build(&self, app: &mut bevy_app::App) {
IoTaskPool::init(|| {
TaskPoolBuilder::default()
.num_threads(2)
TaskPool::init(|| {
TaskPool::build()
.max_threads(2)
.thread_name("Debug Asset Server IO Task Pool".to_string())
.build()
});
Expand Down
13 changes: 4 additions & 9 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
//! This crate provides core functionality for Bevy Engine.

mod name;
mod task_pool_options;

pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable};
pub use name::*;
pub use task_pool_options::*;

pub mod prelude {
//! The Bevy Core Prelude.
#[doc(hidden)]
pub use crate::{DefaultTaskPoolOptions, Name};
pub use crate::Name;
}

use bevy_app::prelude::*;
use bevy_ecs::entity::Entity;
use bevy_tasks::TaskPool;
use bevy_utils::HashSet;
use std::ops::Range;

Expand All @@ -25,12 +24,8 @@ pub struct CorePlugin;

impl Plugin for CorePlugin {
fn build(&self, app: &mut App) {
// Setup the default bevy task pools
app.world
.get_resource::<DefaultTaskPoolOptions>()
.cloned()
.unwrap_or_default()
.create_default_pools();
// Setup the default bevy task pool if not already set up
app.world.init_resource::<TaskPool>();

app.register_type::<Entity>().register_type::<Name>();

Expand Down
153 changes: 0 additions & 153 deletions crates/bevy_core/src/task_pool_options.rs

This file was deleted.

6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod tests {
query::{Added, ChangeTrackers, Changed, FilteredAccess, With, Without, WorldQuery},
world::{Mut, World},
};
use bevy_tasks::{ComputeTaskPool, TaskPool};
use bevy_tasks::TaskPool;
use std::{
any::TypeId,
sync::{
Expand Down Expand Up @@ -375,7 +375,7 @@ mod tests {

#[test]
fn par_for_each_dense() {
ComputeTaskPool::init(TaskPool::default);
TaskPool::init(TaskPool::default);
let mut world = World::new();
let e1 = world.spawn().insert(A(1)).id();
let e2 = world.spawn().insert(A(2)).id();
Expand All @@ -397,7 +397,7 @@ mod tests {

#[test]
fn par_for_each_sparse() {
ComputeTaskPool::init(TaskPool::default);
TaskPool::init(TaskPool::default);
let mut world = World::new();
let e1 = world.spawn().insert(SparseStored(1)).id();
let e2 = world.spawn().insert(SparseStored(2)).id();
Expand Down
12 changes: 6 additions & 6 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
storage::TableId,
world::{World, WorldId},
};
use bevy_tasks::ComputeTaskPool;
use bevy_tasks::{TaskGroup, TaskPool};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -750,7 +750,7 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// write-queries.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// The [`TaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
#[inline]
pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(
Expand All @@ -775,7 +775,7 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// The [`TaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
#[inline]
pub fn par_for_each_mut<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
Expand All @@ -802,7 +802,7 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// This can only be called for read-only queries.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// The [`TaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
Expand Down Expand Up @@ -918,7 +918,7 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// iter() method, but cannot be chained like a normal [`Iterator`].
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// The [`TaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
Expand All @@ -941,7 +941,7 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryState::for_each_unchecked_manual, QueryState::many_for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
ComputeTaskPool::get().scope(|scope| {
TaskPool::get().scope(TaskGroup::Compute, |scope| {
if QF::IS_DENSE && <QueryFetch<'static, F>>::IS_DENSE {
let tables = &world.storages().tables;
for table_id in &self.matched_table_ids {
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
world::World,
};
use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
use bevy_tasks::{Scope, TaskGroup, TaskPool};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -123,7 +123,7 @@ impl ParallelSystemExecutor for ParallelExecutor {
}
}

ComputeTaskPool::init(TaskPool::default).scope(|scope| {
TaskPool::init(TaskPool::default).scope(TaskGroup::Compute, |scope| {
self.prepare_systems(scope, systems, world);
let parallel_executor = async {
// All systems have been ran if there are no queued or running systems.
Expand Down
1 change: 0 additions & 1 deletion crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub struct ParallelCommandsState {
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
Expand Down
Loading