From eb0444bb252e160735111fea4eac696690ef695a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Tue, 11 Jul 2023 16:35:22 +0200 Subject: [PATCH 1/7] Detect and cleanup globals if the process has been forked --- crates/re_sdk/src/global.rs | 79 +++++++++++++++++++++++++++ crates/re_sdk/src/lib.rs | 2 + crates/re_sdk/src/recording_stream.rs | 30 ++++++++++ rerun_py/rerun_sdk/rerun/__init__.py | 20 +++++++ rerun_py/src/python_bridge.rs | 7 +++ 5 files changed, 138 insertions(+) diff --git a/crates/re_sdk/src/global.rs b/crates/re_sdk/src/global.rs index b7d0d3422e9f..84d1be1d44cd 100644 --- a/crates/re_sdk/src/global.rs +++ b/crates/re_sdk/src/global.rs @@ -38,6 +38,38 @@ thread_local! { static LOCAL_BLUEPRINT_RECORDING: RefCell> = RefCell::new(None); } +/// Check whether a fork has happened since creating the recording streams. +/// If so, then we forget our globals. +pub fn cleanup_if_forked() { + if let Some(global_recording) = RecordingStream::global(StoreKind::Recording) { + if global_recording.has_forked() { + re_log::debug!("Fork detected. Forgetting global Recording"); + RecordingStream::forget_global(StoreKind::Recording); + } + } + + if let Some(global_blueprint) = RecordingStream::global(StoreKind::Blueprint) { + if global_blueprint.has_forked() { + re_log::debug!("Fork detected. Forgetting global Blueprint"); + RecordingStream::forget_global(StoreKind::Recording); + } + } + + if let Some(thread_recording) = RecordingStream::thread_local(StoreKind::Recording) { + if thread_recording.has_forked() { + re_log::debug!("Fork detected. Forgetting thread-local Recording"); + RecordingStream::forget_thread_local(StoreKind::Recording); + } + } + + if let Some(thread_blueprint) = RecordingStream::thread_local(StoreKind::Blueprint) { + if thread_blueprint.has_forked() { + re_log::debug!("Fork detected. Forgetting thread-local Blueprint"); + RecordingStream::forget_thread_local(StoreKind::Blueprint); + } + } +} + impl RecordingStream { /// Returns `overrides` if it exists, otherwise returns the most appropriate active recording /// of the specified type (i.e. thread-local first, then global scope), if any. @@ -106,6 +138,15 @@ impl RecordingStream { Self::set_any(RecordingScope::Global, kind, rec) } + /// Forgets the currently active recording of the specified type in the global scope. + /// + /// WARNING: this intentionally bypasses any drop/flush logic. This should only ever be used in + /// cases where you know the batcher/sink threads have been lost such as in a forked process. + #[inline] + pub fn forget_global(kind: StoreKind) { + Self::forget_any(RecordingScope::Global, kind); + } + // --- Thread local --- /// Returns the currently active recording of the specified type in the thread-local scope, @@ -125,6 +166,15 @@ impl RecordingStream { Self::set_any(RecordingScope::ThreadLocal, kind, rec) } + /// Forgets the currently active recording of the specified type in the thread-local scope. + /// + /// WARNING: this intentionally bypasses any drop/flush logic. This should only ever be used in + /// cases where you know the batcher/sink threads have been lost such as in a forked process. + #[inline] + pub fn forget_thread_local(kind: StoreKind) { + Self::forget_any(RecordingScope::ThreadLocal, kind); + } + // --- Internal helpers --- fn get_any(scope: RecordingScope, kind: StoreKind) -> Option { @@ -180,6 +230,35 @@ impl RecordingStream { }, } } + + fn forget_any(scope: RecordingScope, kind: StoreKind) { + match kind { + StoreKind::Recording => match scope { + RecordingScope::Global => { + if let Some(global) = GLOBAL_DATA_RECORDING.get() { + std::mem::forget(global.write().take()); + } + } + RecordingScope::ThreadLocal => LOCAL_DATA_RECORDING.with(|cell| { + if let Some(cell) = cell.take() { + std::mem::forget(cell); + } + }), + }, + StoreKind::Blueprint => match scope { + RecordingScope::Global => { + if let Some(global) = GLOBAL_BLUEPRINT_RECORDING.get() { + std::mem::forget(global.write().take()); + } + } + RecordingScope::ThreadLocal => LOCAL_BLUEPRINT_RECORDING.with(|cell| { + if let Some(cell) = cell.take() { + std::mem::forget(cell); + } + }), + }, + } + } } // --- diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index 22a223598efa..258d78929230 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -26,6 +26,8 @@ pub use re_log_types::{ ApplicationId, Component, ComponentName, EntityPath, SerializableComponent, StoreId, StoreKind, }; +pub use global::cleanup_if_forked; + #[cfg(not(target_arch = "wasm32"))] impl crate::sink::LogSink for re_log_encoding::FileSink { fn send(&self, msg: re_log_types::LogMsg) { diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 07f0fad9a5f5..1755a03c7f26 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -349,10 +349,17 @@ struct RecordingStreamInner { batcher: DataTableBatcher, batcher_to_sink_handle: Option>, + + pid_at_creation: u32, } impl Drop for RecordingStreamInner { fn drop(&mut self) { + if self.has_forked() { + re_log::warn_once!("Process-id mismatch while dropping RecordingStreamInner. Likely forked without calling cleanup_if_forked()."); + return; + } + // NOTE: The command channel is private, if we're here, nothing is currently capable of // sending data down the pipeline. self.batcher.flush_blocking(); @@ -410,8 +417,14 @@ impl RecordingStreamInner { cmds_tx, batcher, batcher_to_sink_handle: Some(batcher_to_sink_handle), + pid_at_creation: std::process::id(), }) } + + #[inline] + pub fn has_forked(&self) -> bool { + self.pid_at_creation != std::process::id() + } } enum Command { @@ -591,6 +604,18 @@ impl RecordingStream { pub fn store_info(&self) -> Option<&StoreInfo> { (*self.inner).as_ref().map(|inner| &inner.info) } + + /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our + /// batcher/sink threads are gone and all data logged since the fork has been dropped. + /// + /// It is essential that [`crate::cleanup_if_forked`] be called after forking the process. SDK-implementations + /// should do this during their initialization phase. + #[inline] + pub fn has_forked(&self) -> bool { + (*self.inner) + .as_ref() + .map_or(false, |inner| inner.has_forked()) + } } impl RecordingStream { @@ -737,6 +762,11 @@ impl RecordingStream { /// /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees. pub fn flush_blocking(&self) { + if self.has_forked() { + re_log::warn_once!("Fork detected - call to flush_blocking() ignored. Likely forked without calling cleanup_if_forked()."); + return; + } + let Some(this) = &*self.inner else { re_log::warn_once!("Recording disabled - call to flush_blocking() ignored"); return; diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 01afdff12197..42f00e243d5c 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -155,6 +155,10 @@ def init( global _strict_mode _strict_mode = strict + # Always check for fork when calling init. This should have happened via `_register_on_fork` + # but it's worth being conservative. + # cleanup_if_forked() + if init_logging: new_recording( application_id, @@ -311,6 +315,22 @@ def unregister_shutdown() -> None: atexit.unregister(rerun_shutdown) +def cleanup_if_forked() -> None: + bindings.cleanup_if_forked() + + +def _register_on_fork() -> None: + # Only relevant on Linux + try: + import os + + os.register_at_fork(after_in_child=cleanup_if_forked) + except NotImplementedError: + pass + + +_register_on_fork() + # --- diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 554ce134bc49..bdd630615a90 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -119,6 +119,7 @@ fn rerun_bindings(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(new_recording, m)?)?; m.add_function(wrap_pyfunction!(new_blueprint, m)?)?; m.add_function(wrap_pyfunction!(shutdown, m)?)?; + m.add_function(wrap_pyfunction!(cleanup_if_forked, m)?)?; // recordings m.add_function(wrap_pyfunction!(get_application_id, m)?)?; @@ -349,6 +350,12 @@ fn get_global_data_recording() -> Option { RecordingStream::global(rerun::StoreKind::Recording).map(PyRecordingStream) } +/// Cleans up internal state if the process was forked +#[pyfunction] +fn cleanup_if_forked() { + rerun::cleanup_if_forked(); +} + /// Replaces the currently active recording in the global scope with the specified one. /// /// Returns the previous one, if any. From 64e7e38fd97ad5cc2139f4bc72bac902f509fb11 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Tue, 11 Jul 2023 17:44:59 +0200 Subject: [PATCH 2/7] Add a decorator to shutdown gracefully when exiting forked multiprocess contexts --- examples/python/multiprocessing/main.py | 9 ++++---- rerun_py/rerun_sdk/rerun/__init__.py | 28 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/examples/python/multiprocessing/main.py b/examples/python/multiprocessing/main.py index 479e44488edd..a4ce73c32647 100755 --- a/examples/python/multiprocessing/main.py +++ b/examples/python/multiprocessing/main.py @@ -10,6 +10,10 @@ import rerun as rr # pip install rerun-sdk +# Python does not guarantee that the normal atexit-handlers will be called at the +# termination of a multiprocessing.Process. Explicitly add the `shutdown_at_exit` +# decorator to ensure data is flushed when the task completes. +@rr.shutdown_at_exit def task(child_index: int) -> None: # All processes spawned with `multiprocessing` will automatically # be assigned the same default recording_id. @@ -37,11 +41,6 @@ def main() -> None: task(0) - # Using multiprocessing with "fork" results in a hang on shutdown so - # always use "spawn" - # TODO(https://github.com/rerun-io/rerun/issues/1921) - multiprocessing.set_start_method("spawn") - for i in [1, 2, 3]: p = multiprocessing.Process(target=task, args=(i,)) p.start() diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 42f00e243d5c..178787e15a91 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -1,6 +1,9 @@ """The Rerun Python SDK, which is a wrapper around the re_sdk crate.""" from __future__ import annotations +import functools +from typing import Any, Callable, TypeVar, cast + # NOTE: The imports determine what is public API. Avoid importing globally anything that is not public API. Use # (private) function and local import if needed. import rerun_bindings as bindings # type: ignore[attr-defined] @@ -331,6 +334,31 @@ def _register_on_fork() -> None: _register_on_fork() + +_TFunc = TypeVar("_TFunc", bound=Callable[..., Any]) + + +def shutdown_at_exit(func: _TFunc) -> _TFunc: + """ + Decorator to shutdown Rerun cleanly when this function exits. + + Normally, Rerun installs an atexit-handler that attempts to shutdown cleanly and + flush all outgoing data before terminating. However, some cases, such as forked + processes will always skip this at-exit handler. In these cases, you can use this + decorator on the entry-point to your subprocess to ensure cleanup happens as + expected without losing data. + """ + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return func(*args, **kwargs) + finally: + rerun_shutdown() + + return cast(_TFunc, wrapper) + + # --- From 2c19556f56096e675fbf9f4c3c89b05807192410 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Tue, 11 Jul 2023 17:56:08 +0200 Subject: [PATCH 3/7] Not cleaning up after forking is an error. --- crates/re_sdk/src/recording_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 1755a03c7f26..c9e4e3571d07 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -356,7 +356,7 @@ struct RecordingStreamInner { impl Drop for RecordingStreamInner { fn drop(&mut self) { if self.has_forked() { - re_log::warn_once!("Process-id mismatch while dropping RecordingStreamInner. Likely forked without calling cleanup_if_forked()."); + re_log::error_once!("Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."); return; } @@ -763,7 +763,7 @@ impl RecordingStream { /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees. pub fn flush_blocking(&self) { if self.has_forked() { - re_log::warn_once!("Fork detected - call to flush_blocking() ignored. Likely forked without calling cleanup_if_forked()."); + re_log::error_once!("Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."); return; } From 679ab14a24bee8a583bd988b476204a580031069 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Tue, 11 Jul 2023 18:02:07 +0200 Subject: [PATCH 4/7] Uncomment code from testing edge cases --- rerun_py/rerun_sdk/rerun/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 178787e15a91..08b6c37fb54f 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -160,7 +160,7 @@ def init( # Always check for fork when calling init. This should have happened via `_register_on_fork` # but it's worth being conservative. - # cleanup_if_forked() + cleanup_if_forked() if init_logging: new_recording( From b7a564f2544cf86ccd91b130f407a691b4fc31c5 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Jul 2023 15:16:52 +0200 Subject: [PATCH 5/7] Be clearer about forked_child --- crates/re_sdk/src/global.rs | 17 ++++++++++------- crates/re_sdk/src/lib.rs | 2 +- crates/re_sdk/src/recording_stream.rs | 10 +++++----- examples/python/multiprocessing/main.py | 8 +++++--- rerun_py/rerun_sdk/rerun/__init__.py | 12 ++++++------ rerun_py/src/python_bridge.rs | 8 ++++---- 6 files changed, 31 insertions(+), 26 deletions(-) diff --git a/crates/re_sdk/src/global.rs b/crates/re_sdk/src/global.rs index 84d1be1d44cd..a9ac4537059c 100644 --- a/crates/re_sdk/src/global.rs +++ b/crates/re_sdk/src/global.rs @@ -38,32 +38,35 @@ thread_local! { static LOCAL_BLUEPRINT_RECORDING: RefCell> = RefCell::new(None); } -/// Check whether a fork has happened since creating the recording streams. -/// If so, then we forget our globals. -pub fn cleanup_if_forked() { +/// Check whether we are the child of a fork. +/// +/// If so, then our globals need to be cleaned up because they don't have associated batching +/// or sink threads. The parent of the fork will continue to process any data in the original +/// globals so nothing is being lost by doing this. +pub fn cleanup_if_forked_child() { if let Some(global_recording) = RecordingStream::global(StoreKind::Recording) { - if global_recording.has_forked() { + if global_recording.is_forked_child() { re_log::debug!("Fork detected. Forgetting global Recording"); RecordingStream::forget_global(StoreKind::Recording); } } if let Some(global_blueprint) = RecordingStream::global(StoreKind::Blueprint) { - if global_blueprint.has_forked() { + if global_blueprint.is_forked_child() { re_log::debug!("Fork detected. Forgetting global Blueprint"); RecordingStream::forget_global(StoreKind::Recording); } } if let Some(thread_recording) = RecordingStream::thread_local(StoreKind::Recording) { - if thread_recording.has_forked() { + if thread_recording.is_forked_child() { re_log::debug!("Fork detected. Forgetting thread-local Recording"); RecordingStream::forget_thread_local(StoreKind::Recording); } } if let Some(thread_blueprint) = RecordingStream::thread_local(StoreKind::Blueprint) { - if thread_blueprint.has_forked() { + if thread_blueprint.is_forked_child() { re_log::debug!("Fork detected. Forgetting thread-local Blueprint"); RecordingStream::forget_thread_local(StoreKind::Blueprint); } diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index 258d78929230..afca7d04679b 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -26,7 +26,7 @@ pub use re_log_types::{ ApplicationId, Component, ComponentName, EntityPath, SerializableComponent, StoreId, StoreKind, }; -pub use global::cleanup_if_forked; +pub use global::cleanup_if_forked_child; #[cfg(not(target_arch = "wasm32"))] impl crate::sink::LogSink for re_log_encoding::FileSink { diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index c9e4e3571d07..ad66acac580f 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -355,7 +355,7 @@ struct RecordingStreamInner { impl Drop for RecordingStreamInner { fn drop(&mut self) { - if self.has_forked() { + if self.is_forked_child() { re_log::error_once!("Fork detected while dropping RecordingStreamInner. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."); return; } @@ -422,7 +422,7 @@ impl RecordingStreamInner { } #[inline] - pub fn has_forked(&self) -> bool { + pub fn is_forked_child(&self) -> bool { self.pid_at_creation != std::process::id() } } @@ -611,10 +611,10 @@ impl RecordingStream { /// It is essential that [`crate::cleanup_if_forked`] be called after forking the process. SDK-implementations /// should do this during their initialization phase. #[inline] - pub fn has_forked(&self) -> bool { + pub fn is_forked_child(&self) -> bool { (*self.inner) .as_ref() - .map_or(false, |inner| inner.has_forked()) + .map_or(false, |inner| inner.is_forked_child()) } } @@ -762,7 +762,7 @@ impl RecordingStream { /// /// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees. pub fn flush_blocking(&self) { - if self.has_forked() { + if self.is_forked_child() { re_log::error_once!("Fork detected during flush. cleanup_if_forked() should always be called after forking. This is likely a bug in the SDK."); return; } diff --git a/examples/python/multiprocessing/main.py b/examples/python/multiprocessing/main.py index a4ce73c32647..b0cfe801f553 100755 --- a/examples/python/multiprocessing/main.py +++ b/examples/python/multiprocessing/main.py @@ -15,10 +15,12 @@ # decorator to ensure data is flushed when the task completes. @rr.shutdown_at_exit def task(child_index: int) -> None: - # All processes spawned with `multiprocessing` will automatically - # be assigned the same default recording_id. - # We just need to connect each process to the the rerun viewer: + # In the new process, we always need to call init with the same application id. + # The internal recording-id is carried over from the parent process, so all + # of these processes will have their log data merged in the viewer. rr.init("multiprocessing") + + # We then have to connect to the viewer instance. rr.connect() title = f"task {child_index}" diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 08b6c37fb54f..a833576dc186 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -158,9 +158,9 @@ def init( global _strict_mode _strict_mode = strict - # Always check for fork when calling init. This should have happened via `_register_on_fork` - # but it's worth being conservative. - cleanup_if_forked() + # Always check whether we are a forked child when calling init. This should have happened + # via `_register_on_fork` but it's worth being conservative. + cleanup_if_forked_child() if init_logging: new_recording( @@ -318,8 +318,8 @@ def unregister_shutdown() -> None: atexit.unregister(rerun_shutdown) -def cleanup_if_forked() -> None: - bindings.cleanup_if_forked() +def cleanup_if_forked_child() -> None: + bindings.cleanup_if_forked_child() def _register_on_fork() -> None: @@ -327,7 +327,7 @@ def _register_on_fork() -> None: try: import os - os.register_at_fork(after_in_child=cleanup_if_forked) + os.register_at_fork(after_in_child=cleanup_if_forked_child) except NotImplementedError: pass diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index bdd630615a90..d8e4b5d0f26c 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -119,7 +119,7 @@ fn rerun_bindings(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(new_recording, m)?)?; m.add_function(wrap_pyfunction!(new_blueprint, m)?)?; m.add_function(wrap_pyfunction!(shutdown, m)?)?; - m.add_function(wrap_pyfunction!(cleanup_if_forked, m)?)?; + m.add_function(wrap_pyfunction!(cleanup_if_forked_child, m)?)?; // recordings m.add_function(wrap_pyfunction!(get_application_id, m)?)?; @@ -350,10 +350,10 @@ fn get_global_data_recording() -> Option { RecordingStream::global(rerun::StoreKind::Recording).map(PyRecordingStream) } -/// Cleans up internal state if the process was forked +/// Cleans up internal state if this is the child of a forked process. #[pyfunction] -fn cleanup_if_forked() { - rerun::cleanup_if_forked(); +fn cleanup_if_forked_child() { + rerun::cleanup_if_forked_child(); } /// Replaces the currently active recording in the global scope with the specified one. From 8acdfdcc0525450d274607c731575c05d1f7529a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Jul 2023 15:25:14 +0200 Subject: [PATCH 6/7] Clarify recording-id comment --- examples/python/multiprocessing/main.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/python/multiprocessing/main.py b/examples/python/multiprocessing/main.py index b0cfe801f553..3f398497c71c 100755 --- a/examples/python/multiprocessing/main.py +++ b/examples/python/multiprocessing/main.py @@ -15,9 +15,11 @@ # decorator to ensure data is flushed when the task completes. @rr.shutdown_at_exit def task(child_index: int) -> None: - # In the new process, we always need to call init with the same application id. - # The internal recording-id is carried over from the parent process, so all - # of these processes will have their log data merged in the viewer. + # In the new process, we always need to call init with the same `application_id`. + # By default, the `recording_id`` will match the `recording_id`` of the parent process, + # so all of these processes will have their log data merged in the viewer. + # Caution: if you manually specified `recording_id` in the parent, you also must + # pass the same `recording_id` here. rr.init("multiprocessing") # We then have to connect to the viewer instance. From 55564c104b18f547ec7b851b5ead9c16d60d52a3 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Jul 2023 15:45:41 +0200 Subject: [PATCH 7/7] Fix broken doclink --- crates/re_sdk/src/recording_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index ad66acac580f..053dc61c5a98 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -608,7 +608,7 @@ impl RecordingStream { /// Determine whether a fork has happened since creating this `RecordingStream`. In general, this means our /// batcher/sink threads are gone and all data logged since the fork has been dropped. /// - /// It is essential that [`crate::cleanup_if_forked`] be called after forking the process. SDK-implementations + /// It is essential that [`crate::cleanup_if_forked_child`] be called after forking the process. SDK-implementations /// should do this during their initialization phase. #[inline] pub fn is_forked_child(&self) -> bool {