From 61c23afbe911f46629cd084bf5a129385e47321e Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Jan 2024 09:22:44 +0100 Subject: [PATCH] Python SDK: introduce deferred garbage collection queue (#4583) This introduces a new deferred system to clear Arrow garbage that was originally allocated in Python land. This should fix all deadlocks/segfaults/aborts past, present and future... or not :smiling_face_with_tear: NOTE: This lives in parallel to the already existing `ALL_RECORDINGS` thingy, which is still needed to avoid killing and joining threads at a bad time. --- Cargo.lock | 21 +-- crates/re_log_types/src/arrow_msg.rs | 70 +++++++++- crates/re_log_types/src/data_table.rs | 2 + crates/re_log_types/src/data_table_batcher.rs | 10 +- crates/re_log_types/src/lib.rs | 2 +- crates/re_sdk/src/log_sink.rs | 14 ++ crates/re_sdk/src/recording_stream.rs | 21 +-- crates/re_sdk_comms/src/buffered_client.rs | 49 +------ rerun_py/Cargo.toml | 1 + rerun_py/src/python_bridge.rs | 124 ++++++++++++++---- tests/python/gil_stress/main.py | 51 +++++++ 11 files changed, 270 insertions(+), 95 deletions(-) create mode 100644 tests/python/gil_stress/main.py diff --git a/Cargo.lock b/Cargo.lock index fde3d47055df..e1522417ab88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,15 +2868,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -4239,7 +4230,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -4260,7 +4251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.32", @@ -5583,6 +5574,7 @@ name = "rerun_py" version = "0.12.0-alpha.3" dependencies = [ "arrow2", + "crossbeam", "document-features", "itertools 0.12.0", "mimalloc", @@ -7418,13 +7410,14 @@ dependencies = [ [[package]] name = "which" -version = "4.4.0" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" dependencies = [ "either", - "libc", + "home", "once_cell", + "rustix 0.38.24", ] [[package]] diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index d7b8970dc08e..20d1ea574397 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -3,12 +3,68 @@ //! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps //! the inner Arrow serialization of [`Schema`] and [`Chunk`]. +use std::sync::Arc; + use crate::{TableId, TimePoint}; use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; +/// An arbitrary callback to be run when an [`ArrowMsg`], and more specifically the +/// Arrow [`Chunk`] within it, goes out of scope. +/// +/// If the [`ArrowMsg`] has been cloned in a bunch of places, the callback will run for each and +/// every instance. +/// It is up to the callback implementer to handle this, if needed. +#[allow(clippy::type_complexity)] +#[derive(Clone)] +pub struct ArrowChunkReleaseCallback(Arc>) + Send + Sync>); + +impl std::ops::Deref for ArrowChunkReleaseCallback { + type Target = dyn Fn(Chunk>) + Send + Sync; + + #[inline] + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl From for ArrowChunkReleaseCallback +where + F: Fn(Chunk>) + Send + Sync + 'static, +{ + #[inline] + fn from(f: F) -> Self { + Self(Arc::new(f)) + } +} + +impl ArrowChunkReleaseCallback { + #[inline] + pub fn as_ptr(&self) -> *const () { + Arc::as_ptr(&self.0).cast::<()>() + } +} + +impl PartialEq for ArrowChunkReleaseCallback { + #[inline] + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(self.as_ptr(), other.as_ptr()) + } +} + +impl Eq for ArrowChunkReleaseCallback {} + +impl std::fmt::Debug for ArrowChunkReleaseCallback { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ArrowChunkReleaseCallback") + .field(&format!("{:p}", self.as_ptr())) + .finish() + } +} + /// Message containing an Arrow payload -#[must_use] #[derive(Clone, Debug, PartialEq)] +#[must_use] pub struct ArrowMsg { /// Unique identifier for the [`crate::DataTable`] in this message. pub table_id: TableId, @@ -24,6 +80,17 @@ pub struct ArrowMsg { /// Data for all control & data columns. pub chunk: Chunk>, + + // pub on_release: Option>, + pub on_release: Option, +} + +impl Drop for ArrowMsg { + fn drop(&mut self) { + if let Some(on_release) = self.on_release.take() { + (*on_release)(self.chunk.clone() /* shallow */); + } + } } #[cfg(feature = "serde")] @@ -127,6 +194,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { timepoint_max, schema, chunk, + on_release: None, }) } else { Err(serde::de::Error::custom( diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 4c4c1349f686..0fc2567758f6 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1048,6 +1048,7 @@ impl DataTable { timepoint_max: _, schema, chunk, + on_release: _, } = msg; Self::deserialize(*table_id, schema, chunk) @@ -1066,6 +1067,7 @@ impl DataTable { timepoint_max, schema, chunk, + on_release: None, }) } } diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs index 89cf50e40e3e..d5bbe6a00b10 100644 --- a/crates/re_log_types/src/data_table_batcher.rs +++ b/crates/re_log_types/src/data_table_batcher.rs @@ -37,7 +37,7 @@ pub type DataTableBatcherResult = Result; /// Defines the different thresholds of the associated [`DataTableBatcher`]. /// /// See [`Self::default`] and [`Self::from_env`]. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct DataTableBatcherConfig { /// Duration of the periodic tick. // @@ -64,6 +64,11 @@ pub struct DataTableBatcherConfig { /// /// Unbounded if left unspecified. pub max_tables_in_flight: Option, + + /// Callback to be run when an Arrow Chunk` goes out of scope. + /// + /// See [`crate::ArrowChunkReleaseCallback`] for more information. + pub on_release: Option, } impl Default for DataTableBatcherConfig { @@ -80,6 +85,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Always flushes ASAP. @@ -89,6 +95,7 @@ impl DataTableBatcherConfig { flush_num_rows: 0, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Never flushes unless manually told to. @@ -98,6 +105,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, + on_release: None, }; /// Environment variable to configure [`Self::flush_tick`]. diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 52fe26104a48..1eca325a255b 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -37,7 +37,7 @@ mod data_table_batcher; use std::sync::Arc; -pub use self::arrow_msg::ArrowMsg; +pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg}; pub use self::data_cell::{DataCell, DataCellError, DataCellInner, DataCellResult}; pub use self::data_row::{ DataCellRow, DataCellVec, DataReadError, DataReadResult, DataRow, DataRowError, DataRowResult, diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 01daceb39f1d..c5cd86762f3a 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -42,6 +42,20 @@ pub trait LogSink: Send + Sync + 'static { #[derive(Default)] pub struct BufferedSink(parking_lot::Mutex>); +impl Drop for BufferedSink { + fn drop(&mut self) { + for msg in self.0.lock().iter() { + // Sinks intentionally end up with pending SetStoreInfo messages + // these are fine to drop safely. Anything else should produce a + // warning. + if !matches!(msg, LogMsg::SetStoreInfo(_)) { + re_log::warn!("Dropping data in BufferedSink"); + return; + } + } + } +} + impl BufferedSink { /// An empty buffer. #[inline] diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 4e1c4743d4ab..028221e97871 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -5,9 +5,10 @@ use ahash::HashMap; use crossbeam::channel::{Receiver, Sender}; use re_log_types::{ - ApplicationId, DataCell, DataCellError, DataRow, DataTable, DataTableBatcher, - DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, StoreId, StoreInfo, - StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName, + ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable, + DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, + StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, + TimelineName, }; use re_types_core::{components::InstanceKey, AsComponents, ComponentBatch, SerializationError}; @@ -610,6 +611,7 @@ impl RecordingStreamInner { batcher_config: DataTableBatcherConfig, sink: Box, ) -> RecordingStreamResult { + let on_release = batcher_config.on_release.clone(); let batcher = DataTableBatcher::new(batcher_config)?; { @@ -636,7 +638,7 @@ impl RecordingStreamInner { .spawn({ let info = info.clone(); let batcher = batcher.clone(); - move || forwarding_thread(info, sink, cmds_rx, batcher.tables()) + move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release) }) .map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })? }; @@ -956,6 +958,7 @@ fn forwarding_thread( mut sink: Box, cmds_rx: Receiver, tables: Receiver, + on_release: Option, ) { /// Returns `true` to indicate that processing can continue; i.e. `false` means immediate /// shutdown. @@ -1018,7 +1021,7 @@ fn forwarding_thread( // NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible, // which in turns makes `RecordingStream::flush_blocking` well defined. while let Ok(table) = tables.try_recv() { - let table = match table.to_arrow_msg() { + let mut arrow_msg = match table.to_arrow_msg() { Ok(table) => table, Err(err) => { re_log::error!(%err, @@ -1026,7 +1029,8 @@ fn forwarding_thread( continue; } }; - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table)); + arrow_msg.on_release = on_release.clone(); + sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); } select! { @@ -1037,7 +1041,7 @@ fn forwarding_thread( re_log::trace!("Shutting down forwarding_thread: batcher is gone"); break; }; - let table = match table.to_arrow_msg() { + let mut arrow_msg = match table.to_arrow_msg() { Ok(table) => table, Err(err) => { re_log::error!(%err, @@ -1045,7 +1049,8 @@ fn forwarding_thread( continue; } }; - sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table)); + arrow_msg.on_release = on_release.clone(); + sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg)); } recv(cmds_rx) -> res => { let Ok(cmd) = res else { diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 0778931be75b..326c0d78a583 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -43,10 +43,8 @@ pub struct Client { flushed_rx: Receiver, encode_quit_tx: Sender, send_quit_tx: Sender, - drop_quit_tx: Sender, encode_join: Option>, send_join: Option>, - drop_join: Option>, /// Only used for diagnostics, not for communication after `new()`. addr: SocketAddr, @@ -65,12 +63,10 @@ impl Client { // TODO(emilk): keep track of how much memory is in each pipe // and apply back-pressure to not use too much RAM. let (msg_tx, msg_rx) = crossbeam::channel::unbounded(); - let (msg_drop_tx, msg_drop_rx) = crossbeam::channel::unbounded(); let (packet_tx, packet_rx) = crossbeam::channel::unbounded(); let (flushed_tx, flushed_rx) = crossbeam::channel::unbounded(); let (encode_quit_tx, encode_quit_rx) = crossbeam::channel::unbounded(); let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded(); - let (drop_quit_tx, drop_quit_rx) = crossbeam::channel::unbounded(); // We don't compress the stream because we assume the SDK // and server are on the same machine and compression @@ -80,13 +76,7 @@ impl Client { let encode_join = std::thread::Builder::new() .name("msg_encoder".into()) .spawn(move || { - msg_encode( - encoding_options, - &msg_rx, - &msg_drop_tx, - &encode_quit_rx, - &packet_tx, - ); + msg_encode(encoding_options, &msg_rx, &encode_quit_rx, &packet_tx); }) .expect("Failed to spawn thread"); @@ -97,22 +87,13 @@ impl Client { }) .expect("Failed to spawn thread"); - let drop_join = std::thread::Builder::new() - .name("msg_dropper".into()) - .spawn(move || { - msg_drop(&msg_drop_rx, &drop_quit_rx); - }) - .expect("Failed to spawn thread"); - Self { msg_tx, flushed_rx, encode_quit_tx, send_quit_tx, - drop_quit_tx, encode_join: Some(encode_join), send_join: Some(send_join), - drop_join: Some(drop_join), addr, } } @@ -166,9 +147,7 @@ impl Drop for Client { self.encode_join.take().map(|j| j.join().ok()); // Then the other threads: self.send_quit_tx.send(InterruptMsg::Quit).ok(); - self.drop_quit_tx.send(QuitMsg).ok(); self.send_join.take().map(|j| j.join().ok()); - self.drop_join.take().map(|j| j.join().ok()); re_log::debug!("TCP client has shut down."); } } @@ -182,31 +161,9 @@ impl fmt::Debug for Client { } } -// We drop messages in a separate thread because the PyO3 + Arrow memory model -// means in some cases these messages actually store pointers back to -// python-managed memory. We don't want to block our send-thread waiting for the -// GIL. -fn msg_drop(msg_drop_rx: &Receiver, quit_rx: &Receiver) { - loop { - select! { - recv(msg_drop_rx) -> msg_msg => { - if msg_msg.is_err() { - re_log::trace!("Shutting down msg dropper thread: channel has closed"); - return; - } - } - recv(quit_rx) -> _quit_msg => { - re_log::trace!("Shutting down msg dropper thread: quit message received"); - return; - } - } - } -} - fn msg_encode( encoding_options: re_log_encoding::EncodingOptions, msg_rx: &Receiver, - msg_drop_tx: &Sender, quit_rx: &Receiver, packet_tx: &Sender, ) { @@ -240,10 +197,6 @@ fn msg_encode( return; } } - if msg_drop_tx.send(msg_msg).is_err() { - re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); - return; - } } recv(quit_rx) -> _quit_msg => { re_log::debug!("Shutting down msg_encode thread: quit received"); diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 5b826afab50b..1b0298d9c2e8 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -53,6 +53,7 @@ re_viewer.workspace = true re_viewport.workspace = true arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } +crossbeam.workspace = true document-features.workspace = true itertools = { workspace = true } mimalloc = { workspace = true, features = ["local_dynamic_tls"] } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index fdbccb92a412..97357f9aedf0 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -2,7 +2,8 @@ #![allow(clippy::borrow_deref_ref)] // False positive due to #[pufunction] macro #![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pufunction] macro -use std::{collections::HashMap, path::PathBuf}; +use std::collections::HashMap; +use std::path::PathBuf; use itertools::Itertools; use pyo3::{ @@ -11,8 +12,6 @@ use pyo3::{ types::{PyBytes, PyDict}, }; -//use re_viewer_context::SpaceViewId; -//use re_viewport::{SpaceViewBlueprint, VIEWPORT_PATH}; use re_viewport::VIEWPORT_PATH; use re_log_types::{DataRow, EntityPathPart, StoreKind}; @@ -28,8 +27,7 @@ use re_ws_comms::RerunServerPort; // --- FFI --- -use once_cell::sync::OnceCell; -use parking_lot::Mutex; +use once_cell::sync::{Lazy, OnceCell}; // The bridge needs to have complete control over the lifetimes of the individual recordings, // otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python @@ -38,15 +36,59 @@ use parking_lot::Mutex; // // TODO(#2116): drop unused recordings fn all_recordings() -> parking_lot::MutexGuard<'static, HashMap> { - static ALL_RECORDINGS: OnceCell>> = OnceCell::new(); + static ALL_RECORDINGS: OnceCell>> = + OnceCell::new(); ALL_RECORDINGS.get_or_init(Default::default).lock() } +type GarbageChunk = arrow2::chunk::Chunk>; +type GarbageSender = crossbeam::channel::Sender; +type GarbageReceiver = crossbeam::channel::Receiver; + +/// ## Release Callbacks +/// +/// When Arrow data gets logged from Python to Rust across FFI, it carries with it a `release` +/// callback (see Arrow spec) that will be run when the data gets dropped. +/// +/// This is an issue in this case because running that callback will likely try and grab the GIL, +/// which is something that should only happen at very specific times, else we end up with deadlocks, +/// segfaults, aborts… +/// +/// ## The garbage queue +/// +/// When a [`re_log_types::LogMsg`] that was logged from Python gets dropped on the Rust side, it will end up +/// in this queue. +/// +/// The mere fact that the data still exists in this queue prevents the underlying Arrow refcount +/// to go below one, which in turn prevents the associated FFI `release` callback to run, which +/// avoids the issue mentioned above. +/// +/// When the time is right, call [`flush_garbage_queue`] to flush the queue and deallocate all the +/// accumulated data for real. +// +// NOTE: `crossbeam` rather than `std` because we need a `Send` & `Sync` receiver. +static GARBAGE_QUEUE: Lazy<(GarbageSender, GarbageReceiver)> = + Lazy::new(crossbeam::channel::unbounded); + +/// Flushes the [`GARBAGE_QUEUE`], therefore running all the associated FFI `release` callbacks. +/// +/// Any time you release the GIL (e.g. `py.allow_threads()`), try to slip in a call to this +/// function so we don't accumulate too much garbage. +fn flush_garbage_queue() { + while GARBAGE_QUEUE.1.try_recv().is_ok() { + // Implicitly dropping chunks, therefore triggering their `release` callbacks, therefore + // triggering the native Python GC. + } +} + +// --- + #[cfg(feature = "web_viewer")] fn global_web_viewer_server( ) -> parking_lot::MutexGuard<'static, Option> { - static WEB_HANDLE: OnceCell>> = - OnceCell::new(); + static WEB_HANDLE: OnceCell< + parking_lot::Mutex>, + > = OnceCell::new(); WEB_HANDLE.get_or_init(Default::default).lock() } @@ -196,7 +238,14 @@ fn new_recording( default_store_id(py, StoreKind::Recording, &application_id) }; + let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let on_release = |chunk| { + GARBAGE_QUEUE.0.send(chunk).ok(); + }; + batcher_config.on_release = Some(on_release.into()); + let recording = RecordingStreamBuilder::new(application_id) + .batcher_config(batcher_config) .is_official_example(is_official_example) .store_id(recording_id.clone()) .store_source(re_log_types::StoreSource::PythonSdk(python_version(py))) @@ -246,7 +295,14 @@ fn new_blueprint( default_store_id(py, StoreKind::Blueprint, &application_id) }; + let mut batcher_config = re_log_types::DataTableBatcherConfig::from_env().unwrap_or_default(); + let on_release = |chunk| { + GARBAGE_QUEUE.0.send(chunk).ok(); + }; + batcher_config.on_release = Some(on_release.into()); + let blueprint = RecordingStreamBuilder::new(application_id) + .batcher_config(batcher_config) .store_id(blueprint_id.clone()) .store_source(re_log_types::StoreSource::PythonSdk(python_version(py))) .default_enabled(default_enabled) @@ -280,6 +336,7 @@ fn shutdown(py: Python<'_>) { for (_, recording) in all_recordings().drain() { recording.disconnect(); } + flush_garbage_queue(); }); } @@ -351,11 +408,13 @@ fn set_global_data_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_global( + let rec = RecordingStream::set_global( rerun::StoreKind::Recording, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -381,11 +440,13 @@ fn set_thread_local_data_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_thread_local( + let rec = RecordingStream::set_thread_local( rerun::StoreKind::Recording, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -422,11 +483,13 @@ fn set_global_blueprint_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_global( + let rec = RecordingStream::set_global( rerun::StoreKind::Blueprint, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -452,11 +515,13 @@ fn set_thread_local_blueprint_recording( // NOTE: This cannot happen anymore with the new `ALL_RECORDINGS` thingy, but better safe than // sorry. py.allow_threads(|| { - RecordingStream::set_thread_local( + let rec = RecordingStream::set_thread_local( rerun::StoreKind::Blueprint, recording.map(|rec| rec.0.clone()), ) - .map(PyRecordingStream) + .map(PyRecordingStream); + flush_garbage_queue(); + rec }) } @@ -498,6 +563,7 @@ fn connect( blueprint.connect_opts(addr, flush_timeout); }; } + flush_garbage_queue(); }); Ok(()) @@ -513,9 +579,11 @@ fn save(path: &str, recording: Option<&PyRecordingStream>, py: Python<'_>) -> Py // The call to save may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - recording + let res = recording .save(path) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())); + flush_garbage_queue(); + res }) } @@ -529,9 +597,11 @@ fn stdout(recording: Option<&PyRecordingStream>, py: Python<'_>) -> PyResult<()> // The call to stdout may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - recording + let res = recording .stdout() - .map_err(|err| PyRuntimeError::new_err(err.to_string())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())); + flush_garbage_queue(); + res }) } @@ -545,7 +615,11 @@ fn memory_recording( get_data_recording(recording).map(|rec| { // The call to memory may internally flush. // Release the GIL in case any flushing behavior needs to cleanup a python object. - let inner = py.allow_threads(|| rec.memory()); + let inner = py.allow_threads(|| { + let storage = rec.memory(); + flush_garbage_queue(); + storage + }); PyMemorySinkStorage { rec: rec.0, inner } }) } @@ -570,6 +644,7 @@ impl PyMemorySinkStorage { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { self.rec.flush_blocking(); + flush_garbage_queue(); }); MemorySinkStorage::concat_memory_sinks_as_bytes( @@ -590,6 +665,7 @@ impl PyMemorySinkStorage { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { self.rec.flush_blocking(); + flush_garbage_queue(); }); self.inner.num_msgs() @@ -599,7 +675,6 @@ impl PyMemorySinkStorage { #[cfg(feature = "web_viewer")] #[must_use = "the tokio_runtime guard must be kept alive while using tokio"] fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { - use once_cell::sync::Lazy; static TOKIO_RUNTIME: Lazy = Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); TOKIO_RUNTIME.enter() @@ -660,6 +735,7 @@ fn disconnect(py: Python<'_>, recording: Option<&PyRecordingStream>) { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { recording.disconnect(); + flush_garbage_queue(); }); } @@ -676,6 +752,7 @@ fn flush(py: Python<'_>, blocking: bool, recording: Option<&PyRecordingStream>) } else { recording.flush_async(); } + flush_garbage_queue(); }); } @@ -851,6 +928,7 @@ fn set_auto_space_views(enabled: bool, blueprint: Option<&PyRecordingStream>) { recording=None, ))] fn log_arrow_msg( + py: Python<'_>, entity_path: &str, components: &PyDict, timeless: bool, @@ -873,6 +951,8 @@ fn log_arrow_msg( recording.record_row(row, !timeless); + py.allow_threads(flush_garbage_queue); + Ok(()) } diff --git a/tests/python/gil_stress/main.py b/tests/python/gil_stress/main.py new file mode 100644 index 000000000000..14e266ed5abd --- /dev/null +++ b/tests/python/gil_stress/main.py @@ -0,0 +1,51 @@ +""" +Stress test for things that tend to GIL deadlock. + +Logs many large recordings that contain a lot of large rows. + +Usage: +``` +python main.py +""" +from __future__ import annotations + +import rerun as rr + +rec = rr.new_recording(application_id="test") + +rec = rr.new_recording(application_id="test") +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", make_default=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", make_thread_default=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") # this works +rr.set_global_data_recording(rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") # this works +rr.set_thread_local_data_recording(rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test", spawn=True) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") +rr.connect(recording=rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +rec = rr.new_recording(application_id="test") +rr.memory_recording(recording=rec) +rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +for _ in range(3): + rec = rr.new_recording(application_id="test", make_default=False, make_thread_default=False) + mem = rec.memory_recording() + rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner) + +for _ in range(3): + rec = rr.new_recording(application_id="test", make_default=False, make_thread_default=False) + rr.log("test", rr.Points3D([1, 2, 3]), recording=rec.inner)