From 2ce64d4c91ef9692c8a820af49243103f9ad915c Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 20 Jan 2023 11:07:11 +0100 Subject: [PATCH 1/4] Drop msgs on their own thread --- crates/re_sdk_comms/src/buffered_client.rs | 43 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index e35661d91670..3766c6ba5429 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -32,6 +32,7 @@ pub struct Client { flushed_rx: Receiver, encode_quit_tx: Sender, send_quit_tx: Sender, + drop_quit_tx: Sender, } impl Default for Client { @@ -45,19 +46,29 @@ impl Client { // TODO(emilk): keep track of how much memory is in each pipe // and apply back-pressure to not use too much RAM. let (msg_tx, msg_rx) = crossbeam::channel::unbounded(); + let (msg_drop_tx, msg_drop_rx) = crossbeam::channel::unbounded(); let (packet_tx, packet_rx) = crossbeam::channel::unbounded(); let (flushed_tx, flushed_rx) = crossbeam::channel::unbounded(); let (encode_quit_tx, encode_quit_rx) = crossbeam::channel::unbounded(); let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded(); + let (drop_quit_tx, drop_quit_rx) = crossbeam::channel::unbounded(); std::thread::Builder::new() .name("msg_encoder".into()) .spawn(move || { - msg_encode(&msg_rx, &encode_quit_rx, &packet_tx); + msg_encode(&msg_rx, &msg_drop_tx, &encode_quit_rx, &packet_tx); re_log::debug!("Shutting down msg encoder thread"); }) .expect("Failed to spawn thread"); + std::thread::Builder::new() + .name("msg_dropper".into()) + .spawn(move || { + msg_drop(&msg_drop_rx, &drop_quit_rx); + re_log::debug!("Shutting down msg dropper thread"); + }) + .expect("Failed to spawn thread"); + std::thread::Builder::new() .name("tcp_sender".into()) .spawn(move || { @@ -71,6 +82,7 @@ impl Client { flushed_rx, encode_quit_tx, send_quit_tx, + drop_quit_tx, } } @@ -112,12 +124,34 @@ impl Drop for Client { self.flush(); self.encode_quit_tx.send(QuitMsg).ok(); self.send_quit_tx.send(QuitMsg).ok(); + self.drop_quit_tx.send(QuitMsg).ok(); re_log::debug!("Sender has shut down."); } } +// We drop messages in a separate thread because the PyO3 + Arrow memory model +// means in some cases these messages actually store pointers back to +// python-managed memory. We don't want to block our send-thread waiting for the +// GIL. +fn msg_drop(msg_drop_rx: &Receiver, quit_rx: &Receiver) { + loop { + select! { + recv(msg_drop_rx) -> msg_msg => { + if let Ok(_) = msg_msg { + } else { + return; // channel has closed + } + } + recv(quit_rx) -> _quit_msg => { + return; + } + } + } +} + fn msg_encode( msg_rx: &Receiver, + msg_drop_tx: &Sender, quit_rx: &Receiver, packet_tx: &Sender, ) { @@ -125,19 +159,20 @@ fn msg_encode( select! { recv(msg_rx) -> msg_msg => { if let Ok(msg_msg) = msg_msg { - let packet_msg = match msg_msg { + let packet_msg = match &msg_msg { MsgMsg::LogMsg(log_msg) => { - let packet = crate::encode_log_msg(&log_msg); + let packet = crate::encode_log_msg(log_msg); re_log::trace!("Encoded message of size {}", packet.len()); PacketMsg::Packet(packet) } - MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(new_addr), + MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(new_addr.clone()), MsgMsg::Flush => PacketMsg::Flush, }; packet_tx .send(packet_msg) .expect("tcp_sender thread should live longer"); + msg_drop_tx.send(msg_msg).expect("Foo"); } else { return; // channel has closed } From 1a3f933da0868b55f7a1696177a1b1494598733c Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 20 Jan 2023 11:07:21 +0100 Subject: [PATCH 2/4] Relesae the GIL before flush --- rerun_py/src/python_bridge.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index ae1d5294c256..2d29ee7d88c9 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -325,11 +325,15 @@ fn serve() -> PyResult<()> { } #[pyfunction] -fn shutdown() { - re_log::debug!("Shutting down the Rerun SDK"); - let mut session = global_session(); - session.flush(); - session.disconnect(); +fn shutdown(py: Python) { + // Release the GIL in case any flushing behavior needs to + // cleanup a python object. + py.allow_threads(|| { + re_log::debug!("Shutting down the Rerun SDK"); + let mut session = global_session(); + session.flush(); + session.disconnect(); + }); } /// Disconnect from remote server (if any). From f2d9b4e92b73c66d3510322299e35769964fb9fe Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 20 Jan 2023 11:18:00 +0100 Subject: [PATCH 3/4] Better expect msg --- crates/re_sdk_comms/src/buffered_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 3766c6ba5429..039965e59aee 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -172,7 +172,7 @@ fn msg_encode( packet_tx .send(packet_msg) .expect("tcp_sender thread should live longer"); - msg_drop_tx.send(msg_msg).expect("Foo"); + msg_drop_tx.send(msg_msg).expect("Main thread should still be alive"); } else { return; // channel has closed } From 26c9c66c573517f9318764ee8df4381823ad87c0 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 20 Jan 2023 13:43:31 +0100 Subject: [PATCH 4/4] clippy --- crates/re_sdk_comms/src/buffered_client.rs | 5 ++--- rerun_py/src/python_bridge.rs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 039965e59aee..95c3dcaa2cbb 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -137,8 +137,7 @@ fn msg_drop(msg_drop_rx: &Receiver, quit_rx: &Receiver) { loop { select! { recv(msg_drop_rx) -> msg_msg => { - if let Ok(_) = msg_msg { - } else { + if msg_msg.is_err() { return; // channel has closed } } @@ -165,7 +164,7 @@ fn msg_encode( re_log::trace!("Encoded message of size {}", packet.len()); PacketMsg::Packet(packet) } - MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(new_addr.clone()), + MsgMsg::SetAddr(new_addr) => PacketMsg::SetAddr(*new_addr), MsgMsg::Flush => PacketMsg::Flush, }; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 2d29ee7d88c9..7627189e8568 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -325,7 +325,7 @@ fn serve() -> PyResult<()> { } #[pyfunction] -fn shutdown(py: Python) { +fn shutdown(py: Python<'_>) { // Release the GIL in case any flushing behavior needs to // cleanup a python object. py.allow_threads(|| {