Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a bounded SPSC queue #338

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This crate provides a set of tools for concurrent programming:
* [`deque`], work-stealing deques for building task schedulers.
* [`ArrayQueue`], a bounded MPMC queue that allocates a fixed-capacity buffer on construction.
* [`SegQueue`], an unbounded MPMC queue that allocates small buffers, segments, on demand.
* [`spsc`], a bounded SPSC queue that allocates a fixed-capacity buffer on construction.

#### Memory management

Expand All @@ -49,6 +50,7 @@ This crate provides a set of tools for concurrent programming:
[`deque`]: https://docs.rs/crossbeam/*/crossbeam/deque/index.html
[`ArrayQueue`]: https://docs.rs/crossbeam/*/crossbeam/queue/struct.ArrayQueue.html
[`SegQueue`]: https://docs.rs/crossbeam/*/crossbeam/queue/struct.SegQueue.html
[`spsc`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/spsc/index.html
[`channel`]: https://docs.rs/crossbeam/*/crossbeam/channel/index.html
[`Parker`]: https://docs.rs/crossbeam/*/crossbeam/sync/struct.Parker.html
[`ShardedLock`]: https://docs.rs/crossbeam/*/crossbeam/sync/struct.ShardedLock.html
Expand Down
16 changes: 12 additions & 4 deletions crossbeam-channel/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ lockfree = "0.5.1"
futures-preview = "0.2.2"
mpmc = "0.1.5"

[[bin]]
name = "array_queue"
path = "array_queue.rs"

[[bin]]
name = "atomicring"
path = "atomicring.rs"
Expand Down Expand Up @@ -45,14 +49,18 @@ path = "futures-channel.rs"
name = "lockfree"
path = "lockfree.rs"

[[bin]]
name = "mpmc"
path = "mpmc.rs"

[[bin]]
name = "mpsc"
path = "mpsc.rs"

[[bin]]
name = "segqueue"
path = "segqueue.rs"
name = "seg_queue"
path = "seg_queue.rs"

[[bin]]
name = "mpmc"
path = "mpmc.rs"
name = "spsc"
path = "spsc.rs"
119 changes: 119 additions & 0 deletions crossbeam-channel/benchmarks/array_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
extern crate crossbeam;

use crossbeam::queue::ArrayQueue;
use std::thread;

mod message;

const MESSAGES: usize = 5_000_000;
const THREADS: usize = 4;

fn seq() {
let q = ArrayQueue::new(MESSAGES);

for i in 0..MESSAGES {
q.push(message::new(i)).unwrap();
}

for _ in 0..MESSAGES {
q.pop().unwrap();
}
}

fn spsc() {
let q = ArrayQueue::new(MESSAGES);

crossbeam::scope(|scope| {
scope.spawn(|_| {
for i in 0..MESSAGES {
q.push(message::new(i)).unwrap();
}
});

for _ in 0..MESSAGES {
loop {
if q.pop().is_err() {
thread::yield_now();
} else {
break;
}
}
}
})
.unwrap();
}

fn mpsc() {
let q = ArrayQueue::new(MESSAGES);

crossbeam::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| {
for i in 0..MESSAGES / THREADS {
q.push(message::new(i)).unwrap();
}
});
}

for _ in 0..MESSAGES {
loop {
if q.pop().is_err() {
thread::yield_now();
} else {
break;
}
}
}
})
.unwrap();
}

fn mpmc() {
let q = ArrayQueue::new(MESSAGES);

crossbeam::scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| {
for i in 0..MESSAGES / THREADS {
q.push(message::new(i)).unwrap();
}
});
}

for _ in 0..THREADS {
scope.spawn(|_| {
for _ in 0..MESSAGES / THREADS {
loop {
if q.pop().is_err() {
thread::yield_now();
} else {
break;
}
}
}
});
}
})
.unwrap();
}

fn main() {
macro_rules! run {
($name:expr, $f:expr) => {
let now = ::std::time::Instant::now();
$f;
let elapsed = now.elapsed();
println!(
"{:25} {:15} {:7.3} sec",
$name,
"Rust ArrayQueue",
elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1e9
);
};
}

run!("bounded_mpmc", mpmc());
run!("bounded_mpsc", mpsc());
run!("bounded_seq", seq());
run!("bounded_spsc", spsc());
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn main() {
println!(
"{:25} {:15} {:7.3} sec",
$name,
"Rust segqueue",
"Rust SegQueue",
elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1e9
);
};
Expand Down
62 changes: 62 additions & 0 deletions crossbeam-channel/benchmarks/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
extern crate crossbeam;

use crossbeam::queue;
use std::thread;

mod message;

const MESSAGES: usize = 5_000_000;

fn seq() {
let (p, c) = queue::spsc::new(MESSAGES);

for i in 0..MESSAGES {
p.push(message::new(i)).unwrap();
}

for _ in 0..MESSAGES {
c.pop().unwrap();
}
}

fn spsc() {
let (p, c) = queue::spsc::new(MESSAGES);

crossbeam::scope(|scope| {
scope.spawn(move |_| {
for i in 0..MESSAGES {
p.push(message::new(i)).unwrap();
}
});

for _ in 0..MESSAGES {
loop {
if c.pop().is_err() {
thread::yield_now();
} else {
break;
}
}
}
})
.unwrap();
}

fn main() {
macro_rules! run {
($name:expr, $f:expr) => {
let now = ::std::time::Instant::now();
$f;
let elapsed = now.elapsed();
println!(
"{:25} {:15} {:7.3} sec",
$name,
"Rust spsc",
elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1e9
);
};
}

run!("bounded_seq", seq());
run!("bounded_spsc", spsc());
}
2 changes: 2 additions & 0 deletions crossbeam-queue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ This crate provides concurrent queues that can be shared among threads:

* [`ArrayQueue`], a bounded MPMC queue that allocates a fixed-capacity buffer on construction.
* [`SegQueue`], an unbounded MPMC queue that allocates small buffers, segments, on demand.
* [`spsc`], a bounded SPSC queue that allocates a fixed-capacity buffer on construction.

[`ArrayQueue`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/struct.ArrayQueue.html
[`SegQueue`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/struct.SegQueue.html
[`spsc`]: https://docs.rs/crossbeam-queue/*/crossbeam_queue/spsc/index.html

## Usage

Expand Down
10 changes: 5 additions & 5 deletions crossbeam-queue/src/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ unsafe impl<T: Send> Sync for ArrayQueue<T> {}
unsafe impl<T: Send> Send for ArrayQueue<T> {}

impl<T> ArrayQueue<T> {
/// Creates a new bounded queue with the given capacity.
/// Creates a bounded queue with the given capacity.
///
/// # Panics
///
Expand Down Expand Up @@ -265,10 +265,10 @@ impl<T> ArrayQueue<T> {
) {
Ok(_) => {
// Read the value from the slot and update the stamp.
let msg = unsafe { slot.value.get().read() };
let value = unsafe { slot.value.get().read() };
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(msg);
return Ok(value);
}
Err(h) => {
head = h;
Expand Down Expand Up @@ -404,9 +404,9 @@ impl<T> Drop for ArrayQueue<T> {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);

// Loop over all slots that hold a message and drop them.
// Loop over all slots that hold a value and drop them.
for i in 0..self.len() {
// Compute the index of the next slot holding a message.
// Compute the index of the next slot holding a value.
let index = if hix + i < self.cap {
hix + i
} else {
Expand Down
3 changes: 3 additions & 0 deletions crossbeam-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
//!
//! * [`ArrayQueue`], a bounded MPMC queue that allocates a fixed-capacity buffer on construction.
//! * [`SegQueue`], an unbounded MPMC queue that allocates small buffers, segments, on demand.
//! * [`spsc`], a bounded SPSC queue that allocates a fixed-capacity buffer on construction.
//!
//! [`ArrayQueue`]: struct.ArrayQueue.html
//! [`SegQueue`]: struct.SegQueue.html
//! [`spsc`]: spsc/index.html

#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
Expand All @@ -20,3 +22,4 @@ mod seg_queue;
pub use self::array_queue::ArrayQueue;
pub use self::err::{PopError, PushError};
pub use self::seg_queue::SegQueue;
pub mod spsc;
2 changes: 1 addition & 1 deletion crossbeam-queue/src/seg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ unsafe impl<T: Send> Send for SegQueue<T> {}
unsafe impl<T: Send> Sync for SegQueue<T> {}

impl<T> SegQueue<T> {
/// Creates a new unbounded queue.
/// Creates a unbounded queue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Creates a unbounded queue.
/// Creates an unbounded queue.

///
/// # Examples
///
Expand Down
Loading