-
Notifications
You must be signed in to change notification settings - Fork 472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a bounded SPSC queue #338
base: master
Are you sure you want to change the base?
Conversation
I cannot say thank you enough, but I'll try: Thank you!!! |
This is fantastic. Thanks! |
Thanks, that would be a way to go now 🎉 |
crossbeam-queue/src/spsc.rs
Outdated
/// | ||
/// let (p, c) = spsc::<i32>(100); | ||
/// ``` | ||
pub fn spsc<T>(cap: usize) -> (Producer<T>, Consumer<T>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the input argument really is a NonZeroUsize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but NonZeroUsize
would be more appropriate for memory layout optimizations rather than ad-hoc uses like this one. There's little advantage of spsc(NonZeroUsize::new(n).unwrap())
over spsc(n)
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For completeness, the rationale for my comment above was to remove the runtime-panic behavior for something that can be encoded at type-level. I'm fine with your call if you think it is not worth it for ergonomics.
/// assert_eq!(c.pop(), Ok(10)); | ||
/// assert_eq!(c.pop(), Err(PopError)); | ||
/// ``` | ||
pub fn pop(&self) -> Result<T, PopError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a consumer point-of-view, why not at an Option<T>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because push
returns a Result<(), PushError<T>>
so I'm returning a Result
here for consistency. But otherwise I agree, Option<T>
would be fine as well.
impl<T> Producer<T> { | ||
/// Attempts to push an element into the queue. | ||
/// | ||
/// If the queue is full, the element is returned back as an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may sound silly, but I'd be happy if this behavior-when-full would be stated a bit more prominently in the module docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, not sure what you mean exactly. Would you be happy if the first sentence said the following?
/// Attempts to push an element into the queue, returning it back on failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I was unclear, I meant for the whole spsc module (rustdoc and/or README). Something like "a bounded SPSC queue that allocates a fixed-capacity buffer on construction, preventing insertions when full".
tail: Cell<usize>, | ||
} | ||
|
||
unsafe impl<T: Send> Send for Producer<T> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would making just the Inner
itself Send
+ Sync
work? That way, you'll need only one impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would work too, yeah. Although, I personally prefer specifying these kinds of invariants on the API boundaries rather than inside the implementation and hoping auto traits work as intended... :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leveraging auto-traits is in theory a good things because it helps to catch bugs. Like, if you add Rc<T>
to Produce
in the future, the auto-trait approach will catch this (as in, Producer wouldn't be Send
), while manual usnafe impl will lead to unsound code. For simple cases it doesn't matter though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, but it's a double-edged sword. Relying on auto-traits can be risky when all of the following is true:
- Auto-traits are manually implemented for private types.
- Auto-traits are automatically inferred for public types.
- There's a layer of unsafe code between private and public types.
The problem is that the layer of unsafe code can break invariants inferred from auto-traits implemented on private types.
Here's an example. If we relied on Inner: Send + Sync
and replaced all Cell<usize>
s with AtomicUsize
s, then Sync
would be inferred for Producer
and Consumer
, which would be incorrect and could cause data races. That's because the unsafe code in Producer
and Consumer
already assumes that they don't implement Sync
.
So maybe auto-traits are good only when manually implemented on the innermost layer of safe code? But in any case, I think this queue is simple enough that we'd be fine either way.
/// assert_eq!(p.push(10), Ok(())); | ||
/// assert_eq!(p.push(20), Err(PushError(20))); | ||
/// ``` | ||
pub fn push(&self, value: T) -> Result<(), PushError<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given sp
nature of this thing, I'd expect &mut self
here, but I guess it doesn't really matter and makes API more flexible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly - I just made an effort to avoid requiring &mut self
since mutability can be annoying to deal with at times.
Great work! What's the status on this? |
The two SPSC ringbuffer implementations I know have a way to read/write multiple "items" at a time (and a way of checking how much space is there to read/write). Would it be feasible to add such a feature to this implementation? The main use case would be if you have a ringbuffer of audio samples (e.g. Here's two examples: The JACK ringbuffer (docs, code) has The PortAudio ringbuffer (docs, code) has I guess instead of (unsafe) raw pointers it would make more sense to provide (safe) |
@alexbool The PR is blocked mostly by the fact that I'm not sure what changes to anticipate in the future and how to structure the modules. Right now I'm thinking we should perhaps move the SPSC queue (types So the idea is to structure the crate like this: mod array {
mod spsc {
struct Producer<T>;
struct Consumer<T>;
fn new(cap: usize) -> (Producer<T>, Consumer<T>);
}
mod mpsc {
// ...
}
mod spmc {
// ...
}
struct ArrayQueue<T>; // mpmc version
}
mod seg {
mod spsc {
struct Producer<T>;
struct Consumer<T>;
fn new() -> (Producer<T>, Consumer<T>);
}
mod mpsc {
// ...
}
mod spmc {
// ...
}
struct SegQueue<T>; // mpmc version
}
use array::ArrayQueue;
use seg::SegQueue; How does that look? |
@mgeier Interesting! I wonder how should we expose a safe interface for that in Rust. Note that we can't just return
It's a bit tricky, but we can probably figure out a reasonable interface... Another option is to expose methods for batched reads and writes, e.g.: impl<T> Consumer<T> {
fn read_batch(&self, dest: &mut Vec<T>);
}
impl<T> Producer<T> {
fn write_batch(&self, src: &mut Vec<T>);
} |
@stjepang I'm a total Rust beginner and I'm not at all an expert on lock-free data structures. Please bear that in mind! I didn't think about uninitialized memory. The implementations I was mentioning just operate on plain bytes of memory, I guess that's not a serious approach for Rust, right? Would it be possible to somehow default-initialize the whole underlying I guess it would be fine if this feature were only available for a somewhat "trivial" subset of types
If a user requests a "read slice", this memory area must of course be "blocked" for writing (and for other readers as well, but it is a "single reader" queue anyway). In the JACK example, the function In the PortAudio example, callers of the function I would hope that with the lifetimes of Rust slices, it would not be necessary to call a separate function when finished accessing the data. But of course I have no idea if this would actually work.
Do Anyway, this may force an unnecessary copy of data in some cases, because you need the data to live in some contiguous memory before/after the write/read operation. If you have e.g. strided data (e.g. interleaved audio channels), you'll have to make an additional copy. UPDATE: The Rust bindings for JACK provide access to its ring buffer. They use this API for writing multiple "items" at once:
See https://docs.rs/jack/0.6.0/jack/struct.RingBufferWriter.html#method.get_vector |
@mgeier Here's another try - how about the following interface? impl<T> Consumer<T> {
fn pop_batch(&mut self) -> PopBatch<'_, T>;
}
impl<T> Producer<T> {
fn push_batch(&mut self) -> PushBatch<'_, T>;
}
struct PopBatch<'a, T>;
impl<T> PopBatch<'_, T> {
fn len(&self) -> usize;
fn pop(&self) -> Result<T, PopError>;
}
struct PushBatch<'a, T>;
impl<T> PushBatch<'_, T> {
fn len(&self) -> usize;
fn push(&self) -> Result<(), PushError<T>>;
} The idea is that we start a batch by calling This way each individual pop/push operation in a batch will only do a single memcpy to transfer the value, whereas each regular non-batched pop/push operation updates the atomic indices every single time. Would this work for you? Or is it really important that you're able to directly access contiguous slices of memory inside the queue? |
Actually, we could then also add the ability for the batches to access direct slices of memory when impl<T: Copy + Zeroable> PopBatch<'_, T> {
fn as_slice(&self) -> &[T];
fn advance(&mut self, n: usize);
}
impl<T: Copy + Zeroable> PushBatch<'_, T> {
fn as_slice(&mut self) -> &mut [T];
fn advance(&mut self, n: usize);
} For example, in order to push a bunch of elements, we would call |
Thanks @stjepang, this looks very promising! But we would still need two slices, right? I guess something like (probably using the plural fn as_slice(&mut self) -> (&mut [T], &mut [T]); I don't think that the additional If anything, I think some kind of a "peek" functionality would be more interesting. But for the cases where the "slices" functionality is available, a separate "peek" method is not necessary, because we can simply "peek" into the slices.
Assuming that What if I guess it wouldn't make too much sense to call When I hear If this works: {
let mut mybatch = myqueue.push_batch();
let (slice1, slice2) = mybatch.as_slice();
// write to slices
mybatch.advance(some_amount);
} ... shouldn't something like this also work? let (slice1, slice2) = myqueue.write_slices();
// write to slices
myqueue.advance_write_index(some_amount); (I probably missed a
Well I don't know if it would be that important for me personally. But if we want to convince low-level audio programmers to switch to Rust, we should try to provide library constructs with as little overhead as possible. Having to make unnecessary copies of audio data is not a good selling point! |
@mgeier Thanks for your patience, I really appreciate it! After thinking a bit more about it, I believe we could implement the following API: impl<T: Copy + Default> Producer<T> {
fn write_slices(&mut self) -> (&mut [T], &mut [T]);
fn advance(&self, n: usize);
}
impl<T: Copy> Consumer<T> {
fn read_slices(&self) -> (&[T], &[T]);
fn advance(&self, n: usize);
} Two tricks that are necessary for ensuring safety:
Neither of these two should really be a performance hit since they can be optimized away very well. Would you be happy with this API? Alternative 1: Alternative 2: It would also be possible for |
Yes, thanks, that looks very nice! Please excuse my ignorance, but I have several more questions: One open question for me is whether there should be additional convenience functions. Getting those pairs of slices is the most important functionality because it is the lowest-level interface. But probably there should also be convenience functions like the As I said, I'm a Rust beginner, so I cannot really tell which interface makes sense and which doesn't. I'm also not sure if the functions Another open question is the TBH, I didn't know Finally, I'm wondering whether the |
You could do that with the standard iterator API: let (s1, s2) = p.write_slices();
let iter = s1.iter_mut().chain(s2.iter_mut());
for slot in iter {
// do something
}
p.advance(/* some number */); Or with a slightly different API, similar to one of those I proposed before: let mut batch = p.batch();
for v in my_values_to_push {
batch.push(v);
}
drop(batch); // advance the write_index by the number of pushed items
I just assumed those functions are not needed because you can compute them from
We would zero-initialize the whole buffer inside the constructor. Note that we can't initialize with Fortunately, almost every piece of data that implements If So yeah, it's really just a bunch of boring and pedantic safety stuff. But the overall effect on performance is zero in 99% cases and minimal in the remaining 1% cases. So no big deal.
Fortunately, writing into an Again, more boring pedantic stuff. We're just making sure that if the user really attempts to do something silly like read from slices returned by
It's probably true that one would either use single-value or batched operations. But does it hurt if we support both? So if we support both, My feeling is that it makes sense to have different types only if supporting both types of operations simultaneously somehow hurts performance or increases the risk of bugs. But that doesn't seem true to me. Is my thinking correct here? |
Based on your comments above, I thought those problems could be solved! I was expecting something like this (which you suggested in #338 (comment)): impl<T: Copy + Default> Producer<T> {
fn write_slices(&mut self) -> (&mut [T], &mut [T]);
fn advance(&self, n: usize);
} And I'm missing impl<T: Copy> Consumer<T> {
fn advance(&self, n: usize);
} The big (and I guess only) difference to your original suggestion is that I have no opinion on that, but your I was a bit confused by the seemingly unnecessary duplication of Just to make sure my understanding is correct:
Finally, I think your impl<T> Iterator for PopBatch<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item>;
} |
Ugh, sorry! Yeah, with I've just created a new document where we can write the code together and add comments. Might be easier than going back and forth on Github. In this PR, I'll just add the minimal possible interface (push/pop only) and then we can follow up with a new PR that adds more features to it.
I'm just omitting |
If we don't make
But if we make If method
I agree.
Correct.
Yes.
Yes. By the way, it's worth mentioning that |
Thanks, it looks great! I can't wait to try out this API!
OK, cool. And thanks for the explanation about |
All right, I have slimmed down the API (removed After that I'll follow up with several PRs introducing the new methods we have discussed. @mgeier By the way, in case you live in Berlin, I'll be tomorrow in co.up coworking space in Adalbertstraße. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stjepang It seems awesome. Thanks.
/// The queue capacity. | ||
cap: usize, | ||
|
||
/// Indicates that dropping a `Buffer<T>` may drop elements of type `T`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer
-> Inner
Sorry if that's a naive question: Is it possible to make the ring buffer I would like to use it in a FFI function within
|
In case somebody is interested in a use case for the SPSC ring buffer, I'm using it in https://github.com/AudioSceneDescriptionFormat/asdf-rust/blob/master/src/streamer.rs. In this project I need ring buffers for both use cases:
Since the "slices" API is not yet available, I couldn't directly implement the first case and had to come up with a somewhat complicated work-around. What about merging this PR and moving forward with the "slices" API? What is the timeline of the previously discussed re-structuring of the modules/structs/functions in If the re-structuring takes some more time, what about merging this PR now and doing the re-structuring later together with everything else? AFAICT we've reached consensus regarding the names of most functions and structs in this PR, the "only" open questions are with regards to the module hierarchy and the "constructor" functions, right? UPDATE (April 2021): In the meantime, I've updated the code (AudioSceneDescriptionFormat/asdf-rust@cd5fc8d) to use the new API I've introduced in https://crates.io/crates/rtrb, which simplifies things a lot. |
An extension of this work enhanced with batch operations on slices can be found here: https://github.com/uklotzde/crossbeam/tree/spsc2 Some background: I'm a developer of the Mixxx team. Recently I came up with a new SPSC queue implementation to replace our naive C++ implementation and the error-prone PortAudio C code. The enthusiasm for my initiative was limited, so I decided to abandon the C++ work. Since I didn't find a versatile Rust implementation I ported and extended my code. Publishing another implementation in a separate crate would further fragment the ecosystem and crossbeam seems to be the right place where those concurrency tools should reside. I've tried to closely follow the existing design in this PR with some notable exceptions:
I didn't want to open a competing PR and instead are asking for your feedback here. It is still work in progress. |
@@ -142,7 +142,7 @@ unsafe impl<T: Send> Send for SegQueue<T> {} | |||
unsafe impl<T: Send> Sync for SegQueue<T> {} | |||
|
|||
impl<T> SegQueue<T> { | |||
/// Creates a new unbounded queue. | |||
/// Creates a unbounded queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Creates a unbounded queue. | |
/// Creates an unbounded queue. |
Any update on this PR? |
Yeah I really want this in. Updates would be nice. |
I'd also be happy to help out here, what needs to be done? |
@stjepang are you still planning to land this PR? |
@stjepang I'm happy to help as well. This design is very close to two other super useful designs, and the wrappers could all share the same datastructure:
I'm happy to help with both of these features. |
Also interested in seeing this land in crossbeam and can give time to make it happen... |
I just discovered an unrelated (AFAIK) implementation of a lock-free SPSC ringbuffer: https://crates.io/crates/ringbuf Any opinions on that? [UPDATE: I tried a few quick benchmarks and this PR is significantly faster than |
OK, since there doesn't seem to be much movement here, I decided to take things into my own hands ... With the permission of @stjepang I grabbed the code from this PR and shoved it into a new repo: https://github.com/mgeier/rtrb. I then implemented some of the things discussed in the comments above, some of the things discussed in a few DropBox Papers documents, and I came up with a few additional API tweaks. I was trying to not cause any performance regressions compared to the code in this PR, I hope I was successful. If you are interested, please have a look: https://github.com/mgeier/rtrb. The code in this PR is of course very good, I hope I didn't butcher it too much, but I've certainly introduced some bugs. I'm very much open for further suggestions regarding API and/or implementation. Please feel free to create issues and PRs on my aforementioned repo. If there are not too many objections, I will publish this in the next few days as a package on https://crates.io. |
It took me a few more days than expected, but finally I've released my new ring buffer (based on this very PR): The API documentation, as usual, is available at: I'd love to hear some feedback, but please use https://github.com/mgeier/rtrb in order to not spam the comments of this PR. |
Hi, is there any reason why this has stalled? |
Crossbeam is largely stalled. |
That is unfortunate. Is there any insight as to why it's stalled, and if it will eventually be revitalized? |
@likeabbas , the work in this PR lives in a separate crate: rtrb, see this comment above. See also this discussion about some upcoming changes. |
Inspired by the Rust 2019: Rust Audio thread, I decided to add a high-performance wait-free bounded SPSC queue. This is the most common queue type in audio programming and it's very important it's wait-free and as fast as possible.
cc @Engid @raphlinus Pinging you if interested.
Simple benchmarks (
ArrayQueue
vsSegQueue
vsspsc
, smaller is better):