Skip to content

Commit

Permalink
Python SDK: introduce deferred garbage collection queue (#4583)
Browse files Browse the repository at this point in the history
This introduces a new deferred system to clear Arrow garbage that was
originally allocated in Python land.

This should fix all deadlocks/segfaults/aborts past, present and
future... or not 🥲

NOTE: This lives in parallel to the already existing `ALL_RECORDINGS`
thingy, which is still needed to avoid killing and joining threads at a
bad time.
  • Loading branch information
teh-cmc authored Jan 2, 2024
1 parent 40338e2 commit 61c23af
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 95 deletions.
21 changes: 7 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 69 additions & 1 deletion crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,68 @@
//! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps
//! the inner Arrow serialization of [`Schema`] and [`Chunk`].
use std::sync::Arc;

use crate::{TableId, TimePoint};
use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};

/// An arbitrary callback to be run when an [`ArrowMsg`], and more specifically the
/// Arrow [`Chunk`] within it, goes out of scope.
///
/// If the [`ArrowMsg`] has been cloned in a bunch of places, the callback will run for each and
/// every instance.
/// It is up to the callback implementer to handle this, if needed.
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct ArrowChunkReleaseCallback(Arc<dyn Fn(Chunk<Box<dyn Array>>) + Send + Sync>);

impl std::ops::Deref for ArrowChunkReleaseCallback {
type Target = dyn Fn(Chunk<Box<dyn Array>>) + Send + Sync;

#[inline]
fn deref(&self) -> &Self::Target {
&*self.0
}
}

impl<F> From<F> for ArrowChunkReleaseCallback
where
F: Fn(Chunk<Box<dyn Array>>) + Send + Sync + 'static,
{
#[inline]
fn from(f: F) -> Self {
Self(Arc::new(f))
}
}

impl ArrowChunkReleaseCallback {
#[inline]
pub fn as_ptr(&self) -> *const () {
Arc::as_ptr(&self.0).cast::<()>()
}
}

impl PartialEq for ArrowChunkReleaseCallback {
#[inline]
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(self.as_ptr(), other.as_ptr())
}
}

impl Eq for ArrowChunkReleaseCallback {}

impl std::fmt::Debug for ArrowChunkReleaseCallback {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ArrowChunkReleaseCallback")
.field(&format!("{:p}", self.as_ptr()))
.finish()
}
}

/// Message containing an Arrow payload
#[must_use]
#[derive(Clone, Debug, PartialEq)]
#[must_use]
pub struct ArrowMsg {
/// Unique identifier for the [`crate::DataTable`] in this message.
pub table_id: TableId,
Expand All @@ -24,6 +80,17 @@ pub struct ArrowMsg {

/// Data for all control & data columns.
pub chunk: Chunk<Box<dyn Array>>,

// pub on_release: Option<Arc<dyn FnOnce() + Send + Sync>>,
pub on_release: Option<ArrowChunkReleaseCallback>,
}

impl Drop for ArrowMsg {
fn drop(&mut self) {
if let Some(on_release) = self.on_release.take() {
(*on_release)(self.chunk.clone() /* shallow */);
}
}
}

#[cfg(feature = "serde")]
Expand Down Expand Up @@ -127,6 +194,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
timepoint_max,
schema,
chunk,
on_release: None,
})
} else {
Err(serde::de::Error::custom(
Expand Down
2 changes: 2 additions & 0 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ impl DataTable {
timepoint_max: _,
schema,
chunk,
on_release: _,
} = msg;

Self::deserialize(*table_id, schema, chunk)
Expand All @@ -1066,6 +1067,7 @@ impl DataTable {
timepoint_max,
schema,
chunk,
on_release: None,
})
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub type DataTableBatcherResult<T> = Result<T, DataTableBatcherError>;
/// Defines the different thresholds of the associated [`DataTableBatcher`].
///
/// See [`Self::default`] and [`Self::from_env`].
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataTableBatcherConfig {
/// Duration of the periodic tick.
//
Expand All @@ -64,6 +64,11 @@ pub struct DataTableBatcherConfig {
///
/// Unbounded if left unspecified.
pub max_tables_in_flight: Option<u64>,

/// Callback to be run when an Arrow Chunk` goes out of scope.
///
/// See [`crate::ArrowChunkReleaseCallback`] for more information.
pub on_release: Option<crate::ArrowChunkReleaseCallback>,
}

impl Default for DataTableBatcherConfig {
Expand All @@ -80,6 +85,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Always flushes ASAP.
Expand All @@ -89,6 +95,7 @@ impl DataTableBatcherConfig {
flush_num_rows: 0,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Never flushes unless manually told to.
Expand All @@ -98,6 +105,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Environment variable to configure [`Self::flush_tick`].
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod data_table_batcher;

use std::sync::Arc;

pub use self::arrow_msg::ArrowMsg;
pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg};
pub use self::data_cell::{DataCell, DataCellError, DataCellInner, DataCellResult};
pub use self::data_row::{
DataCellRow, DataCellVec, DataReadError, DataReadResult, DataRow, DataRowError, DataRowResult,
Expand Down
14 changes: 14 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ pub trait LogSink: Send + Sync + 'static {
#[derive(Default)]
pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);

impl Drop for BufferedSink {
fn drop(&mut self) {
for msg in self.0.lock().iter() {
// Sinks intentionally end up with pending SetStoreInfo messages
// these are fine to drop safely. Anything else should produce a
// warning.
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in BufferedSink");
return;
}
}
}
}

impl BufferedSink {
/// An empty buffer.
#[inline]
Expand Down
21 changes: 13 additions & 8 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};

use re_log_types::{
ApplicationId, DataCell, DataCellError, DataRow, DataTable, DataTableBatcher,
DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, StoreId, StoreInfo,
StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName,
ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable,
DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId,
StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline,
TimelineName,
};
use re_types_core::{components::InstanceKey, AsComponents, ComponentBatch, SerializationError};

Expand Down Expand Up @@ -610,6 +611,7 @@ impl RecordingStreamInner {
batcher_config: DataTableBatcherConfig,
sink: Box<dyn LogSink>,
) -> RecordingStreamResult<Self> {
let on_release = batcher_config.on_release.clone();
let batcher = DataTableBatcher::new(batcher_config)?;

{
Expand All @@ -636,7 +638,7 @@ impl RecordingStreamInner {
.spawn({
let info = info.clone();
let batcher = batcher.clone();
move || forwarding_thread(info, sink, cmds_rx, batcher.tables())
move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release)
})
.map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })?
};
Expand Down Expand Up @@ -956,6 +958,7 @@ fn forwarding_thread(
mut sink: Box<dyn LogSink>,
cmds_rx: Receiver<Command>,
tables: Receiver<DataTable>,
on_release: Option<ArrowChunkReleaseCallback>,
) {
/// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
/// shutdown.
Expand Down Expand Up @@ -1018,15 +1021,16 @@ fn forwarding_thread(
// NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible,
// which in turns makes `RecordingStream::flush_blocking` well defined.
while let Ok(table) = tables.try_recv() {
let table = match table.to_arrow_msg() {
let mut arrow_msg = match table.to_arrow_msg() {
Ok(table) => table,
Err(err) => {
re_log::error!(%err,
"couldn't serialize table; data dropped (this is a bug in Rerun!)");
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table));
arrow_msg.on_release = on_release.clone();
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg));
}

select! {
Expand All @@ -1037,15 +1041,16 @@ fn forwarding_thread(
re_log::trace!("Shutting down forwarding_thread: batcher is gone");
break;
};
let table = match table.to_arrow_msg() {
let mut arrow_msg = match table.to_arrow_msg() {
Ok(table) => table,
Err(err) => {
re_log::error!(%err,
"couldn't serialize table; data dropped (this is a bug in Rerun!)");
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table));
arrow_msg.on_release = on_release.clone();
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg));
}
recv(cmds_rx) -> res => {
let Ok(cmd) = res else {
Expand Down
Loading

0 comments on commit 61c23af

Please sign in to comment.