Skip to content

Commit

Permalink
Cluster small table/archetype into single Task in parallel iteration (b…
Browse files Browse the repository at this point in the history
…evyengine#12846)

# Objective

- Fix bevyengine#7303
- bevy would spawn a lot of tasks in parallel iteration when it matchs a
large storage and many small storage ,it significantly increase the
overhead of schedule.

## Solution

- collect small storage into one task
  • Loading branch information
re0312 committed Apr 4, 2024
1 parent 344e28d commit 4ca8cf5
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 40 deletions.
14 changes: 14 additions & 0 deletions benches/benches/bevy_ecs/iteration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod iter_simple_sparse_set;
mod iter_simple_system;
mod iter_simple_wide;
mod iter_simple_wide_sparse_set;
mod par_iter_simple;

use heavy_compute::*;

Expand All @@ -27,6 +28,7 @@ criterion_group!(
iter_frag_sparse,
iter_simple,
heavy_compute,
par_iter_simple,
);

fn iter_simple(c: &mut Criterion) {
Expand Down Expand Up @@ -117,3 +119,15 @@ fn iter_frag_sparse(c: &mut Criterion) {
});
group.finish();
}

fn par_iter_simple(c: &mut Criterion) {
let mut group = c.benchmark_group("par_iter_simple");
group.warm_up_time(std::time::Duration::from_millis(500));
group.measurement_time(std::time::Duration::from_secs(4));
for f in [0, 10, 100, 1000] {
group.bench_function(format!("with_{}_fragment", f), |b| {
let mut bench = par_iter_simple::Benchmark::new(f);
b.iter(move || bench.run());
});
}
}
73 changes: 73 additions & 0 deletions benches/benches/bevy_ecs/iteration/par_iter_simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use bevy_ecs::prelude::*;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;

#[derive(Component, Copy, Clone)]
struct Transform(Mat4);

#[derive(Component, Copy, Clone)]
struct Position(Vec3);

#[derive(Component, Copy, Clone)]
struct Rotation(Vec3);

#[derive(Component, Copy, Clone)]
struct Velocity(Vec3);

#[derive(Component, Copy, Clone, Default)]
struct Data<const X: u16>(f32);
pub struct Benchmark<'w>(World, QueryState<(&'w Velocity, &'w mut Position)>);

fn insert_if_bit_enabled<const B: u16>(entity: &mut EntityWorldMut, i: u16) {
if i & 1 << B != 0 {
entity.insert(Data::<B>(1.0));
}
}

impl<'w> Benchmark<'w> {
pub fn new(fragment: u16) -> Self {
ComputeTaskPool::get_or_init(TaskPool::default);

let mut world = World::new();

let iter = world.spawn_batch(
std::iter::repeat((
Transform(Mat4::from_scale(Vec3::ONE)),
Position(Vec3::X),
Rotation(Vec3::X),
Velocity(Vec3::X),
))
.take(100_000),
);
let entities = iter.into_iter().collect::<Vec<Entity>>();
for i in 0..fragment {
let mut e = world.entity_mut(entities[i as usize]);
insert_if_bit_enabled::<0>(&mut e, i);
insert_if_bit_enabled::<1>(&mut e, i);
insert_if_bit_enabled::<2>(&mut e, i);
insert_if_bit_enabled::<3>(&mut e, i);
insert_if_bit_enabled::<4>(&mut e, i);
insert_if_bit_enabled::<5>(&mut e, i);
insert_if_bit_enabled::<6>(&mut e, i);
insert_if_bit_enabled::<7>(&mut e, i);
insert_if_bit_enabled::<8>(&mut e, i);
insert_if_bit_enabled::<9>(&mut e, i);
insert_if_bit_enabled::<10>(&mut e, i);
insert_if_bit_enabled::<11>(&mut e, i);
insert_if_bit_enabled::<12>(&mut e, i);
insert_if_bit_enabled::<13>(&mut e, i);
insert_if_bit_enabled::<14>(&mut e, i);
insert_if_bit_enabled::<15>(&mut e, i);
}

let query = world.query::<(&Velocity, &mut Position)>();
Self(world, query)
}

#[inline(never)]
pub fn run(&mut self) {
self.1
.par_iter_mut(&mut self.0)
.for_each(|(v, mut p)| p.0 += v.0);
}
}
3 changes: 2 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"]

[features]
trace = []
multi-threaded = ["bevy_tasks/multi-threaded"]
multi-threaded = ["bevy_tasks/multi-threaded", "arrayvec"]
bevy_debug_stepping = []
default = ["bevy_reflect"]

Expand All @@ -30,6 +30,7 @@ rustc-hash = "1.1"
serde = "1"
thiserror = "1.0"
nonmax = "0.5"
arrayvec = { version = "0.7.4", optional = true }

[dev-dependencies]
rand = "0.8"
Expand Down
119 changes: 80 additions & 39 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,58 +1387,99 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
use arrayvec::ArrayVec;

bevy_tasks::ComputeTaskPool::get().scope(|scope| {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = unsafe { &world.storages().tables };
let archetypes = world.archetypes();
for storage_id in &self.matched_storage_ids {
if D::IS_DENSE && F::IS_DENSE {
let table_id = storage_id.table_id;
let table = &tables[table_id];
if table.is_empty() {
continue;
let mut batch_queue = ArrayVec::new();
let mut queue_entity_count = 0;

// submit a list of storages which smaller than batch_size as single task
let submit_batch_queue = |queue: &mut ArrayVec<StorageId, 128>| {
if queue.is_empty() {
return;
}
let queue = std::mem::take(queue);
let mut func = func.clone();
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let mut iter = self.iter_unchecked_manual(world, last_run, this_run);
for storage_id in queue {
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = &world.storages().tables.get(id).debug_checked_unwrap();
iter.for_each_in_table_range(&mut func, table, 0..table.entity_count());
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
iter.for_each_in_archetype_range(
&mut func,
archetype,
0..archetype.len(),
);
}
}
});
};

let mut offset = 0;
while offset < table.entity_count() {
let mut func = func.clone();
let len = batch_size.min(table.entity_count() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let table =
&world.storages().tables.get(table_id).debug_checked_unwrap();
// submit single storage larger than batch_size
let submit_single = |count, storage_id: StorageId| {
for offset in (0..count).step_by(batch_size) {
let mut func = func.clone();
let len = batch_size.min(count - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = world.storages().tables.get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_table_range(&mut func, table, batch);
});
offset += batch_size;
}
} else {
let archetype_id = storage_id.archetype_id;
let archetype = &archetypes[archetype_id];
if archetype.is_empty() {
continue;
}

let mut offset = 0;
while offset < archetype.len() {
let mut func = func.clone();
let len = batch_size.min(archetype.len() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let archetype =
world.archetypes().get(archetype_id).debug_checked_unwrap();
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_archetype_range(&mut func, archetype, batch);
});
offset += batch_size;
}
}
});
}
};

let storage_entity_count = |storage_id: StorageId| -> usize {
if D::IS_DENSE && F::IS_DENSE {
tables[storage_id.table_id].entity_count()
} else {
archetypes[storage_id.archetype_id].len()
}
};

for storage_id in &self.matched_storage_ids {
let count = storage_entity_count(*storage_id);

// skip empty storage
if count == 0 {
continue;
}
// immediately submit large storage
if count >= batch_size {
submit_single(count, *storage_id);
continue;
}
// merge small storage
batch_queue.push(*storage_id);
queue_entity_count += count;

// submit batch_queue
if queue_entity_count >= batch_size || batch_queue.is_full() {
submit_batch_queue(&mut batch_queue);
queue_entity_count = 0;
}
}
submit_batch_queue(&mut batch_queue);
});
}

Expand Down

0 comments on commit 4ca8cf5

Please sign in to comment.