Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency bugs in mpsc's stream and shared mode #94518

Closed
grelative opened this issue Mar 2, 2022 · 2 comments
Closed

Concurrency bugs in mpsc's stream and shared mode #94518

grelative opened this issue Mar 2, 2022 · 2 comments
Labels
A-concurrency Area: Concurrency C-bug Category: This is a bug. T-libs Relevant to the library team, which will review and decide on the PR/issue.

Comments

@grelative
Copy link

Case 1

If a reader wait on recv_timeout() wakeups because of timeout, that reader will first call abort_selection() to clear the signal_token, then do another try_recv(), and return whatever try_recv() got. Howerver if a send() operation happen after the reader wakeup and finish before the try_recv() got to execute, the try_recv() will return the data from that send() operation, and the next call to recv() will panic.

use std::{sync::mpsc, thread, time};

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx = tx.clone();

    let _ = thread::spawn(move||{
        thread::sleep(time::Duration::from_millis(200));
        tx.send(1).unwrap();
        println!("send finished");
    });

    println!("recv 1: {:?}", rx.recv_timeout(time::Duration::from_millis(100)));
    println!("recv 2: {:?}", rx.recv());
}

To reproduce the problem, we have to arrange the send() happens exactly after the recv_timeout() wakeups, and finishes before the recv_timeout() returns. That can be done with a debugger's help.

(gdb) set non-stop on
(gdb) set env RUST_BACKTRACE 1
(gdb) break std::sync::mpsc::shared::Packet<T>::abort_selection
Breakpoint 1 at 0xb622: file /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs, line 450.
(gdb) run
Starting program: 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff7d82700 (LWP 20921)]

Thread 1 "channel" hit Breakpoint 1, std::sync::mpsc::shared::Packet<T>::abort_selection (self=0x5555555c4a40,
    _was_upgrade=false) at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:450
450                 let _guard = self.select_lock.lock().unwrap();
send finished
[Thread 0x7ffff7d82700 (LWP 20921) exited]

(gdb) continue
Continuing.
recv 1: Ok(1)
thread 'main' panicked at 'internal error: entered unreachable code', /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/mod.rs:1175:43
stack backtrace:
   0: rust_begin_unwind
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:498:5
   1: core::panicking::panic_fmt
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:107:14
   2: core::panicking::panic
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:48:5
   3: std::sync::mpsc::Receiver<T>::recv
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/mod.rs:1175:43
   4: channel::main
             at ./src/bin/channel.rs:14:30
   5: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
[Inferior 1 (process 20879) exited with code 0145]

The recv() panics because in the decrement() , it founds that cnt(2) is greater than steals(1), which means the queue is not empty, and a next try_recv() is guaranteed to return som data. Howerver the queue is actually empty, this inconsistency cause code goes into an impossible path, and panics.

The relation between cnt and steals seems like this: if a sender send one element successfully, it must increase the cnt; if a reader receive an element successfully, it either increase the steals or decrease the cnt, if receive failed, it should not modify or undo the modification to steals and cnt. Thus we are guaranteed when cnt <= steals, it means the queue is definitely empty, and if cnt > steals, the queue is definitely non-empty.

Every recv

pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
// This code is essentially the exact same as that found in the stream
// case (see stream.rs)
match self.try_recv() {
Err(Empty) => {}
data => return data,
}
let (wait_token, signal_token) = blocking::tokens();
if self.decrement(signal_token) == Installed {
if let Some(deadline) = deadline {
let timed_out = !wait_token.wait_max_until(deadline);
if timed_out {
self.abort_selection(false);
}
} else {
wait_token.wait();
}
}
match self.try_recv() {
data @ Ok(..) => unsafe {
*self.steals.get() -= 1;
data
},
data => data,
}
}
will call try_recv() twice:
If the first try_recv success, it will increase the steals; and if not, it will decrease
match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
the cnt, then wait for the wakeup, after the wakeup it will call try_recv again. Normally when reader is waked up, that mean some elment has been sent, so the next try_recv() will return an element successfully; but when reader is waked up because of timeout, the next try_recv() will fail to get an element, thus it has to
let prev = self.bump(steals + 1);
increase the cnt back in abort_selection.
The bug hides in here, in fact, if asendhappend after the wakeup and finish before the next try_recv, the next try_recv will succeed! But in that case, neither the steal is increased nor the cnt is decreased, the consistency is breaked. The next recv will discover this inconsistency, and panics.

fix

This problem can be fixed by adding a return after abort_selection

self.abort_selection(false);
. Since this reader is waked up because of timeout, it does not make sense to do another try_recv, we can return an Err(Timeout) directly.

case 2

use std::{sync::mpsc, thread, time};

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    let thread = thread::spawn(move ||{
        let _ = tx.clone();
        thread::sleep(time::Duration::from_secs(2));
    });

    println!("recv 1: {:?}", rx.recv_timeout(time::Duration::from_secs(1)));
    thread.join().unwrap();
}

This problem also need to reproduce under a debugger.

(gdb) set non-stop on
(gdb) set environment RUST_BACKTRACE 1
(gdb) break std::sync::mpsc::shared::Packet<T>::abort_selection
td::sync::mpsc::shared::Packet<T>::take_to_wakeBreakpoint 1 at 0x16332: file /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs, line 450.
(gdb) break std::sync::mpsc::shared::Packet<T>::take_to_wake
Breakpoint 2 at 0x1622e: file /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs, line 417.
(gdb) run
Starting program: 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff7d82700 (LWP 23113)]

Thread 1 "channel2" hit Breakpoint 1, std::sync::mpsc::shared::Packet<T>::abort_selection (self=0x7ffff0000bf0,
    _was_upgrade=false) at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:450
450                 let _guard = self.select_lock.lock().unwrap();
(gdb)
Thread 2 "channel2" hit Breakpoint 2, std::sync::mpsc::shared::Packet<T>::take_to_wake (self=0x7ffff0000bf0)
    at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:417
417             let ptr = self.to_wake.load(Ordering::SeqCst);

(gdb) continue
Continuing.
thread 'main' panicked at 'assertion failed: `(left == right)`
  left: `93824992685056`,
 right: `0`', /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:465:13
stack backtrace:
   0: rust_begin_unwind
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:498:5
   1: core::panicking::panic_fmt
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:107:14
   2: core::panicking::assert_failed_inner
   3: core::panicking::assert_failed
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:145:5
   4: std::sync::mpsc::shared::Packet<T>::abort_selection
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:465:13
   5: std::sync::mpsc::shared::Packet<T>::recv
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/shared.rs:231:21
   6: std::sync::mpsc::Receiver<T>::recv_deadline
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/mod.rs:1357:48
   7: std::sync::mpsc::Receiver<T>::recv_timeout
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sync/mpsc/mod.rs:1276:35
   8: channel2::main
             at ./src/bin/channel2.rs:11:30
   9: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
[Thread 0x7ffff7d82700 (LWP 23113) exited]
[Inferior 1 (process 23071) exited with code 0145]

This assert

let prev = self.bump(steals + 1);
if prev == DISCONNECTED {
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
true
} else {
in abort_selection caused the panic. It assumes that if the reader sees DISCONNECTED(which means all senders are dropped), the signal token (to_wake) must has been cleared by the destructor. Which is not true, for in drop_chan(destructor of sender side), the to_wake was cleared after cnt was set to DISCONNECTED,
match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-1 => {
self.take_to_wake().signal();
}
DISCONNECTED => {}
n => {
assert!(n >= 0);
}
}

if the reader's load of cnt and to_wake come after L375, but finish before L377, the reader's assertion won't hold.

fix

The fix would be simply remove this assertion. Since once the cnt switch to DISCONNECTED(means all sender are dropped), the recv operation afterwards will not wait for sender anymore, reader doesn't need to worry about the sender wakeup a mismatched signal token like when reader discoverd that cnt >=0.

if prev < 0 {
drop(self.take_to_wake());
} else {
while self.to_wake.load(Ordering::SeqCst) != 0 {
thread::yield_now();
}

note

For the mpsc's stream mode's code is basically the same as shared mode, the stream mode suffers from these problems too.

rust version: rustc 1.58.1 (db9d1b20b 2022-01-20)

@grelative grelative added the C-bug Category: This is a bug. label Mar 2, 2022
@the8472
Copy link
Member

the8472 commented Mar 2, 2022

The current implementation of mpsc is in the process of being replaced (#93563) due to maintainability issues and a long-standing bug that seems related(?) #39364.
That would render this issue moot.

@the8472 the8472 added A-concurrency Area: Concurrency T-libs Relevant to the library team, which will review and decide on the PR/issue. labels Mar 2, 2022
@goffrie
Copy link
Contributor

goffrie commented Jan 20, 2023

This can be closed given that #93563 landed.

@tmiasko tmiasko closed this as completed Jan 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-concurrency Area: Concurrency C-bug Category: This is a bug. T-libs Relevant to the library team, which will review and decide on the PR/issue.
Projects
None yet
Development

No branches or pull requests

4 participants