Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloned mpsc::Sender never blocks #403

Closed
jonhoo opened this issue Mar 6, 2017 · 18 comments
Closed

Cloned mpsc::Sender never blocks #403

jonhoo opened this issue Mar 6, 2017 · 18 comments

Comments

@jonhoo
Copy link
Contributor

jonhoo commented Mar 6, 2017

I believe I have found a pretty serious bug with sync::mpsc::Sender. Consider the program below, which has one thread sending on a channel (and waiting for each send to complete before sending the next), and one thread reading from that same channel. The sender is faster than the reader (due to the sleep in the receiver thread). This program runs correctly; it produces 1001 lines of recv i, then a recv done, with done sending appearing somewhere near recv 11 (since the channel has a buffer size of 11).

extern crate futures;

use futures::{Future, Sink, Stream};

fn main() {
    use std::time;
    use std::thread;
    let (mut tx, rx) = futures::sync::mpsc::channel(10);

    let jh = thread::spawn(move || {
        rx.for_each(|x| {
                thread::sleep(time::Duration::from_millis(1));
                println!("recv {}", x);
                if x == 0 {
                    futures::future::err(())
                } else {
                    futures::future::ok(())
                }
            })
            .wait()
            .is_err()
    });

    let n = 1_000;
    for i in 0..n {
        tx = tx.send(n - i).wait().unwrap();
    }
    tx.send(0).wait().unwrap();
    println!("done sending");

    jh.join().unwrap();
    println!("recv done");
}

Now try changing the line

tx = tx.send(n - i).wait().unwrap();

to

tx.clone().send(n - i).wait().unwrap();

Semantically, these should be the same. The only change should be that the latter clones the transmit handle to the sender before sending on the corresponding channel (blocking if necessary). However, what happens instead is that the .send().wait() in the second version never blocks. The output looks something like

recv 1000
recv 999
done sending
recv 998
...
recv 0
recv done

That is, the channel suddenly behaves as if it is unbounded!

Interestingly, if the line is instead replaced with

tx = tx.clone().send(n - i).wait().unwrap();

The code reverts to the expected blocking behavior.

@alexcrichton
Copy link
Member

cc @carllerche

Thanks for the report! Currently, however I believe this is intended behavior. The method of backpressure is that each channel gets a "free send" and typically the set of channels is reaching a fixed-ish point, but clearly here that's not happenign!

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 6, 2017

Oh, that's quite unexpected. Do the docs mention this anywhere? What's the reason for this "free send"?

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 6, 2017

For what it's worth, the reason I run into this is because I have an fn foo(&self) that needs to send on a channel. And since send consumes self, it's not entirely clear how else to do this. In fact, I also have fn bar(&mut self) elsewhere, and even there it's tricky, because you then need to do something like .take() and Option<Sender> and put it back when the future resolves, but this requires scoping the returned future to the lifetime of the mutable self borrow..

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 6, 2017

So, the intended behavior of this code:

extern crate futures;

use futures::{Future, Sink};

fn main() {
    let (tx, rx) = futures::sync::mpsc::channel(10);

    loop {
        if let Err(e) = tx.clone().send(0).wait() {
            println!(":( {:?}", e);
            break;
        }
    }
    tx.send(0).wait().unwrap();
    println!("done sending");
    drop(rx);
}

is for it to run out of memory? Despite the channel being bounded and of size 10?

@carllerche
Copy link
Member

Yes, this behavior is documented as part of the channel fn:

This channel is unique in that it implements back pressure to ensure that the sender never outpaces the receiver. The channel capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

There are some implementation notes here: https://github.com/alexcrichton/futures-rs/blob/master/src/sync/mpsc/mod.rs#L30-L68

The short version of "why" is because it has to :)

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 6, 2017

Just saw this part of the docs for mpsc::channel

The channel capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

I think this is counter-intuitive behavior... What is the motivation for having this in the first place?

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 6, 2017

Hehe, @carllerche beat me to it. Why does it have to?

EDIT: Even reading through the implementation notes you link to, the claim that you need to be able to know if the send will succeed before sending seems strange to me. Clearly it is possible to have a Sender block (as that's what it does on subsequent send()s), so why can't the same logic be applied to a first-time sender?

@46bit
Copy link
Contributor

46bit commented Mar 7, 2017

This is an interesting case and relevant to a communications library ("herding server clients") I'm working on - cheers @jonhoo. I've found Option::take usage quite the code smell when custom futures can already be complex code.

It's not clear if you spotted this already, but something I had missed for awhile is that a non-consuming send can be done using Sender::start_send and Sender::poll_complete. That doesn't magic away these issues but I've found it leads to workable solutions with sufficient refactoring :)

jonhoo added a commit to mit-pdos/noria that referenced this issue Mar 7, 2017
@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 7, 2017

@46bit I actually ended up working around this by creating a wrapper struct which internally uses Option::take. It ends up with a somewhat nasty signature though (taking an &'a mut self and returning impl Future + 'a), but is fairly straightforward.

@carllerche
Copy link
Member

@jonhoo the current strategy is used to avoid the thundering herd problem. If only one slot is available and there are 1mm blockers senders, they will all be woken up even though only one sender can succeed. This behavior will end up continuing for every available slot.

Guaranteeing a slot for the sender avoids this problem.

I'm going to close the issue since this isn't a bug, but will keep an eye on further comments.

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 7, 2017

@carllerche I don't see how the guaranteed slot fixes the problem? Say the reader isn't reading from the channel, and it fills up completely, including all the "free" slots for the 1mm senders. Then the reader decides to read a single value. Why will the single slot that opens up not cause a thundering herd?

@carllerche
Copy link
Member

In practice, it is a bit more complex. You can read the code here: https://github.com/alexcrichton/futures-rs/blob/master/src/sync/mpsc/mod.rs#L748-L778

It does work though, popping one message will notify at most one sender task.

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 7, 2017

I've written similar lockless queue code in the past, and I guess I'm having a hard time recognizing why unpark_one cannot also be used without these extra slots, but I'll take your word for it.

Taking a step back, this strikes me as fairly unintuitive behavior for a "FIFO queue with back pressure", as it under some conditions simply does not provide back pressure at all. The docs also only reference this behavior in a single place, and arguably in the place you are least likely to look; I suspect few people look up the docs for mpsc::channel, since they assume it works the same as sync::mpsc::sync_channel (at least that was the case for me). Maybe a description both under its own heading at futures::sync::mpsc, and on Sender would be appropriate? The first line of mpsc::channel should also change to reflect the fact that the channel is not actually bounded, so that that information appears on the front page of mpsc where users are most likely to end their search ("to answer the question: how do I make a bounded futures channel?").

I think personally I would prefer channel to be renamed too, to more clearly indicate that the channel is only partially bounded. Something along the lines of mpsc::per_sink_bounded? Though I realize that's a breaking, and more controversial change. I guess I feel so strongly about this because this behavior came as a complete surprise to me, and was extremely hard to debug. I concede that this may be giving me a somewhat biased view :)

@carllerche
Copy link
Member

Something to keep in mind is that, in the async world of futures, a task cannot "block" in the critical section. This means that when a sender is notified, there is no actual guarantee that the task that was notified will ever touch the sender.

I'm sure a PR adding docs in the mpsc section would be happily accepted, so feel free to try to write something up.

I empathize with the confusion, but at this point I don't think a breaking change is worth it.

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 7, 2017

Okay, I'll see if I get some time this week to write up a PR!

I might also end up taking a stab at writing a "real" bounded channel using a lockless circular buffer, but that's for another repo and for another day.

@carllerche
Copy link
Member

Sounds good, I would be happy to be proven wrong :)

I would suggest reading the comment thread in #305 and the other issues related to Shared. It suffers from a similar problem.

@jonhoo
Copy link
Contributor Author

jonhoo commented Mar 7, 2017

Thanks — I'll check it out. I probably won't submit a PR changing the underlying implementation, but a documentation PR should be doable. If I do write up a new bounded mpsc implementation in another crate, I'll link it from here.

Another note about the bounds, I don't think the claim about capacity + num_senders is actually true. Take the code in #403 (comment) for example. There are only ever 2 senders (the original and the current clone), and the buffer bound is 10, yet the queue grows without bound. This is because the cap is really the capacity plus the number of senders there have ever been.

@carllerche
Copy link
Member

If you found a bug, a failing test PR would be super helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants