Skip to content

Commit

Permalink
Fix deadlock when flushing messages (#843)
Browse files Browse the repository at this point in the history
* Drop msgs on their own thread

* Relesae the GIL before flush

* Better expect msg

* clippy
  • Loading branch information
jleibs authored Jan 20, 2023
1 parent 8ee3dfa commit 3b5f4f3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
42 changes: 38 additions & 4 deletions crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Client {
flushed_rx: Receiver<FlushedMsg>,
encode_quit_tx: Sender<QuitMsg>,
send_quit_tx: Sender<QuitMsg>,
drop_quit_tx: Sender<QuitMsg>,
}

impl Default for Client {
Expand All @@ -45,19 +46,29 @@ impl Client {
// TODO(emilk): keep track of how much memory is in each pipe
// and apply back-pressure to not use too much RAM.
let (msg_tx, msg_rx) = crossbeam::channel::unbounded();
let (msg_drop_tx, msg_drop_rx) = crossbeam::channel::unbounded();
let (packet_tx, packet_rx) = crossbeam::channel::unbounded();
let (flushed_tx, flushed_rx) = crossbeam::channel::unbounded();
let (encode_quit_tx, encode_quit_rx) = crossbeam::channel::unbounded();
let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded();
let (drop_quit_tx, drop_quit_rx) = crossbeam::channel::unbounded();

std::thread::Builder::new()
.name("msg_encoder".into())
.spawn(move || {
msg_encode(&msg_rx, &encode_quit_rx, &packet_tx);
msg_encode(&msg_rx, &msg_drop_tx, &encode_quit_rx, &packet_tx);
re_log::debug!("Shutting down msg encoder thread");
})
.expect("Failed to spawn thread");

std::thread::Builder::new()
.name("msg_dropper".into())
.spawn(move || {
msg_drop(&msg_drop_rx, &drop_quit_rx);
re_log::debug!("Shutting down msg dropper thread");
})
.expect("Failed to spawn thread");

std::thread::Builder::new()
.name("tcp_sender".into())
.spawn(move || {
Expand All @@ -71,6 +82,7 @@ impl Client {
flushed_rx,
encode_quit_tx,
send_quit_tx,
drop_quit_tx,
}
}

Expand Down Expand Up @@ -112,32 +124,54 @@ impl Drop for Client {
self.flush();
self.encode_quit_tx.send(QuitMsg).ok();
self.send_quit_tx.send(QuitMsg).ok();
self.drop_quit_tx.send(QuitMsg).ok();
re_log::debug!("Sender has shut down.");
}
}

// We drop messages in a separate thread because the PyO3 + Arrow memory model
// means in some cases these messages actually store pointers back to
// python-managed memory. We don't want to block our send-thread waiting for the
// GIL.
fn msg_drop(msg_drop_rx: &Receiver<MsgMsg>, quit_rx: &Receiver<QuitMsg>) {
loop {
select! {
recv(msg_drop_rx) -> msg_msg => {
if msg_msg.is_err() {
return; // channel has closed
}
}
recv(quit_rx) -> _quit_msg => {
return;
}
}
}
}

fn msg_encode(
msg_rx: &Receiver<MsgMsg>,
msg_drop_tx: &Sender<MsgMsg>,
quit_rx: &Receiver<QuitMsg>,
packet_tx: &Sender<PacketMsg>,
) {
loop {
select! {
recv(msg_rx) -> msg_msg => {
if let Ok(msg_msg) = msg_msg {
let packet_msg = match msg_msg {
let packet_msg = match &msg_msg {
MsgMsg::LogMsg(log_msg) => {
let packet = crate::encode_log_msg(&log_msg);
let packet = crate::encode_log_msg(log_msg);
re_log::trace!("Encoded message of size {}", packet.len());
PacketMsg::Packet(packet)
}
MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(new_addr),
MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(*new_addr),
MsgMsg::Flush => PacketMsg::Flush,
};

packet_tx
.send(packet_msg)
.expect("tcp_sender thread should live longer");
msg_drop_tx.send(msg_msg).expect("Main thread should still be alive");
} else {
return; // channel has closed
}
Expand Down
14 changes: 9 additions & 5 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,15 @@ fn serve() -> PyResult<()> {
}

#[pyfunction]
fn shutdown() {
re_log::debug!("Shutting down the Rerun SDK");
let mut session = global_session();
session.flush();
session.disconnect();
fn shutdown(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
re_log::debug!("Shutting down the Rerun SDK");
let mut session = global_session();
session.flush();
session.disconnect();
});
}

/// Disconnect from remote server (if any).
Expand Down

1 comment on commit 3b5f4f3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 3b5f4f3 Previous: 8ee3dfa Ratio
datastore/insert/batch/rects/insert 568925 ns/iter (± 4770) 559926 ns/iter (± 4903) 1.02

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.