Skip to content

Commit

Permalink
Parallel queries (#292)
Browse files Browse the repository at this point in the history
Add support for Parallel Queries
  • Loading branch information
GrantMoyer authored Sep 8, 2020
1 parent 52ae217 commit 586303f
Show file tree
Hide file tree
Showing 7 changed files with 1,052 additions and 0 deletions.
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ bevy_winit = { path = "crates/bevy_winit", optional = true, version = "0.1" }
[dev-dependencies]
rand = "0.7.2"
serde = { version = "1", features = ["derive"]}
criterion = "0.3"

[[example]]
name = "hello_world"
Expand Down Expand Up @@ -179,6 +180,10 @@ path = "examples/ecs/startup_system.rs"
name = "ecs_guide"
path = "examples/ecs/ecs_guide.rs"

[[example]]
name = "parallel_query"
path = "examples/ecs/parallel_query.rs"

[[example]]
name = "breakout"
path = "examples/game/breakout.rs"
Expand Down Expand Up @@ -242,3 +247,8 @@ path = "examples/window/multiple_windows.rs"
[[example]]
name = "window_settings"
path = "examples/window/window_settings.rs"

[[bench]]
name = "iter"
path = "crates/bevy_tasks/benches/iter.rs"
harness = false
82 changes: 82 additions & 0 deletions crates/bevy_ecs/src/system/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use bevy_hecs::{
Archetype, Component, ComponentError, Entity, Fetch, Query as HecsQuery, QueryOne, Ref, RefMut,
World,
};
use bevy_tasks::ParallelIterator;
use std::marker::PhantomData;

/// Provides scoped access to a World according to a given [HecsQuery]
Expand Down Expand Up @@ -148,6 +149,29 @@ impl<'w, Q: HecsQuery> QueryBorrow<'w, Q> {
iter: None,
}
}

/// Like `iter`, but returns child iterators of at most `batch_size`
/// elements
///
/// Useful for distributing work over a threadpool using the
/// ParallelIterator interface.
///
/// Batch size needs to be chosen based on the task being done in
/// parallel. The elements in each batch are computed serially, while
/// the batches themselves are computed in parallel.
///
/// A too small batch size can cause too much overhead, since scheduling
/// each batch could take longer than running the batch. On the other
/// hand, a too large batch size risks that one batch is still running
/// long after the rest have finished.
pub fn par_iter<'q>(&'q mut self, batch_size: u32) -> ParIter<'q, 'w, Q> {
ParIter {
borrow: self,
archetype_index: 0,
batch_size,
batch: 0,
}
}
}

unsafe impl<'w, Q: HecsQuery> Send for QueryBorrow<'w, Q> {}
Expand Down Expand Up @@ -257,3 +281,61 @@ impl<Q: HecsQuery> ChunkIter<Q> {
}
}
}

/// Batched version of `QueryIter`
pub struct ParIter<'q, 'w, Q: HecsQuery> {
borrow: &'q mut QueryBorrow<'w, Q>,
archetype_index: u32,
batch_size: u32,
batch: u32,
}

impl<'q, 'w, Q: HecsQuery> ParallelIterator<Batch<'q, Q>> for ParIter<'q, 'w, Q> {
type Item = <Q::Fetch as Fetch<'q>>::Item;

fn next_batch(&mut self) -> Option<Batch<'q, Q>> {
loop {
let archetype = self.borrow.archetypes.get(self.archetype_index as usize)?;
let offset = self.batch_size * self.batch;
if offset >= archetype.len() {
self.archetype_index += 1;
self.batch = 0;
continue;
}
if let Some(fetch) = unsafe { Q::Fetch::get(archetype, offset as usize) } {
self.batch += 1;
return Some(Batch {
_marker: PhantomData,
state: ChunkIter {
fetch,
len: self.batch_size.min(archetype.len() - offset),
},
});
} else {
self.archetype_index += 1;
debug_assert_eq!(
self.batch, 0,
"query fetch should always reject at the first batch or not at all"
);
continue;
}
}
}
}

/// A sequence of entities yielded by `ParIter`
pub struct Batch<'q, Q: HecsQuery> {
_marker: PhantomData<&'q ()>,
state: ChunkIter<Q>,
}

impl<'q, 'w, Q: HecsQuery> Iterator for Batch<'q, Q> {
type Item = <Q::Fetch as Fetch<'q>>::Item;

fn next(&mut self) -> Option<Self::Item> {
let components = unsafe { self.state.next()? };
Some(components)
}
}

unsafe impl<'q, Q: HecsQuery> Send for Batch<'q, Q> {}
148 changes: 148 additions & 0 deletions crates/bevy_tasks/benches/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use bevy_tasks::{ParallelIterator, TaskPoolBuilder};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};

struct ParChunks<'a, T>(std::slice::Chunks<'a, T>);
impl<'a, T> ParallelIterator<std::slice::Iter<'a, T>> for ParChunks<'a, T>
where
T: 'a + Send + Sync,
{
type Item = &'a T;

fn next_batch(&mut self) -> Option<std::slice::Iter<'a, T>> {
self.0.next().map(|s| s.iter())
}
}

struct ParChunksMut<'a, T>(std::slice::ChunksMut<'a, T>);
impl<'a, T> ParallelIterator<std::slice::IterMut<'a, T>> for ParChunksMut<'a, T>
where
T: 'a + Send + Sync,
{
type Item = &'a mut T;

fn next_batch(&mut self) -> Option<std::slice::IterMut<'a, T>> {
self.0.next().map(|s| s.iter_mut())
}
}

fn bench_overhead(c: &mut Criterion) {
fn noop(_: &mut usize) {};

let mut v = (0..10000).collect::<Vec<usize>>();
c.bench_function("overhead_iter", |b| {
b.iter(|| {
v.iter_mut().for_each(noop);
})
});

let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("overhead_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunksMut(v.chunks_mut(100)).for_each(&pool, noop);
})
},
);
}
group.finish();
}

fn bench_for_each(c: &mut Criterion) {
fn busy_work(n: usize) {
let mut i = n;
while i > 0 {
i = black_box(i - 1);
}
}

let mut v = (0..10000).collect::<Vec<usize>>();
c.bench_function("for_each_iter", |b| {
b.iter(|| {
v.iter_mut().for_each(|x| {
busy_work(10000);
*x *= *x;
});
})
});

let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("for_each_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunksMut(v.chunks_mut(100)).for_each(&pool, |x| {
busy_work(10000);
*x *= *x;
});
})
},
);
}
group.finish();
}

fn bench_many_maps(c: &mut Criterion) {
fn busy_doubles(mut x: usize, n: usize) -> usize {
for _ in 0..n {
x = black_box(x.wrapping_mul(2));
}
x
}

let v = (0..10000).collect::<Vec<usize>>();
c.bench_function("many_maps_iter", |b| {
b.iter(|| {
v.iter()
.map(|x| busy_doubles(*x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.for_each(drop);
})
});

let v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("many_maps_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
|b, _| {
b.iter(|| {
ParChunks(v.chunks(100))
.map(|x| busy_doubles(*x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.map(|x| busy_doubles(x, 1000))
.for_each(&pool, drop);
})
},
);
}
group.finish();
}

criterion_group!(benches, bench_overhead, bench_for_each, bench_many_maps);
criterion_main!(benches);
Loading

0 comments on commit 586303f

Please sign in to comment.