Skip to content

Commit

Permalink
core: fix missed register_callsite error (#2938)
Browse files Browse the repository at this point in the history
There are 2 triggers which will cause a subscriber to receive a call to
`Subscriber::register_callsite` for a specific callsite.
1. The first time the event or span at that callsite is executed.
2. When a new subscriber is added or removed (for example, calls to
   `set_default` or `with_default`)

It is trigger (2) that will cause a new subscriber to receive
`Subscriber::register_callsite` for all the callsites which had already
been registered before it became active.

When a callsite is registered for trigger (1), the callsite starts in
state `UNREGISTERED`.

The first thread to encounter the callsite will transition it to
`REGISTERING` and determine the overall interest for the callsite by
registering with all known dispatchers (which will call into
`Subscriber::register_callsite`).

Once that is complete, the callsite is added to the list of all known
callsites and its state is transitioned to `REGISTERED`.

is (re)built for all known dispatchers. The callsite starts in state
`UNREGISTERED`. The  This calls down into
`Subscriber::register_callsite` for each subscriber. Once that is
complete, the callsite is added to the global list of known callsites.

While the callsite interest is being rebuilt, other threads that
encounter the callsite will be given `Interest::sometimes()` until the
registration is complete. However, if a new subscriber is added during
this window, all the interest for all callsites will be rebuilt, but
because the new callsite (in state `REGISTERING`) won't be included
because it isn't yet in the global list of callsites.

This can cause a case where that new subscriber being added won't
receive `Subscriber::register_callsite` before it receives the subsequent
call to `Subscriber::event` or `Subscriber::new_span`.

The documentation on [Registering Callsites] is not very explicit on
this point, but it does suggest that `Subscriber::register_callsite`
will be called before the call to either `Subscriber::event` or
`Subscriber::new_span`, and the current behavior can break this implicit
contract.

[Registering Callsites]: https://docs.rs/tracing-core/0.1.32/tracing_core/callsite/index.html#registering-callsites

This change swaps the order of rebuilding the callsite interest and
adding the callsite to the global list so that the callsite gets pushed
first, avoiding this window in which a subscriber won't get a call to
`register_callsite`.

As such, a callsite may have its interest read before it is set. In this
case, the existing implementation will return `Interest::sometimes()`
for the `DefaultCallsite` implementation. Other implementations (outside
of the `tracing` project) may perform this differently, but in this
case, there is no documented guarantee regarding the ordering.

A regression test is included which provokes the race condition 100% of
the time before the changes in this fix.

Fixes: #2743

Co-authored-by: David Barsky <me@davidbarsky.com>
  • Loading branch information
hds and davidbarsky authored Nov 25, 2024
1 parent 6f08af0 commit 8a25a16
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 5 deletions.
9 changes: 4 additions & 5 deletions tracing-core/src/callsite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ pub fn rebuild_interest_cache() {
/// [`Callsite`]: crate::callsite::Callsite
/// [reg-docs]: crate::callsite#registering-callsites
pub fn register(callsite: &'static dyn Callsite) {
rebuild_callsite_interest(callsite, &DISPATCHERS.rebuilder());

// Is this a `DefaultCallsite`? If so, use the fancy linked list!
if callsite.private_type_id(private::Private(())).0 == TypeId::of::<DefaultCallsite>() {
let callsite = unsafe {
Expand All @@ -248,10 +246,11 @@ pub fn register(callsite: &'static dyn Callsite) {
&*(callsite as *const dyn Callsite as *const DefaultCallsite)
};
CALLSITES.push_default(callsite);
return;
} else {
CALLSITES.push_dyn(callsite);
}

CALLSITES.push_dyn(callsite);
rebuild_callsite_interest(callsite, &DISPATCHERS.rebuilder());
}

static CALLSITES: Callsites = Callsites {
Expand Down Expand Up @@ -317,8 +316,8 @@ impl DefaultCallsite {
) {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
CALLSITES.push_default(self);
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
self.registration.store(Self::REGISTERED, Ordering::Release);
}
// Great, the callsite is already registered! Just load its
Expand Down
125 changes: 125 additions & 0 deletions tracing-core/tests/missed_register_callsite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
thread::{self, JoinHandle},
time::Duration,
};

use tracing_core::{
callsite::{Callsite as _, DefaultCallsite},
dispatcher::set_default,
field::{FieldSet, Value},
span, Dispatch, Event, Kind, Level, Metadata, Subscriber,
};

struct TestSubscriber {
sleep: Duration,
callsite: AtomicPtr<Metadata<'static>>,
}

impl TestSubscriber {
fn new(sleep_micros: u64) -> Self {
Self {
sleep: Duration::from_micros(sleep_micros),
callsite: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Subscriber for TestSubscriber {
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
if !self.sleep.is_zero() {
thread::sleep(self.sleep);
}

self.callsite
.store(metadata as *const _ as *mut _, Ordering::SeqCst);

tracing_core::Interest::always()
}

fn event(&self, event: &tracing_core::Event<'_>) {
let stored_callsite = self.callsite.load(Ordering::SeqCst);
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;

// This assert is the actual test.
assert_eq!(
stored_callsite, event_callsite,
"stored callsite: {stored_callsite:#?} does not match event \
callsite: {event_callsite:#?}. Was `event` called before \
`register_callsite`?"
);
}

fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(0)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn enter(&self, _span: &tracing_core::span::Id) {}
fn exit(&self, _span: &tracing_core::span::Id) {}
}

fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> {
thread::Builder::new()
.name(format!("subscriber-{idx}"))
.spawn(move || {
// We use a sleep to ensure the starting order of the 2 threads.
let subscriber = TestSubscriber::new(register_sleep_micros);
let _dispatch_guard = set_default(&Dispatch::new(subscriber));

static CALLSITE: DefaultCallsite = {
// The values of the metadata are unimportant
static META: Metadata<'static> = Metadata::new(
"event ",
"module::path",
Level::INFO,
None,
None,
None,
FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)),
Kind::EVENT,
);
DefaultCallsite::new(&META)
};
let _interest = CALLSITE.interest();

let meta = CALLSITE.metadata();
let field = meta.fields().field("message").unwrap();
let message = format!("event-from-{idx}", idx = idx);
let values = [(&field, Some(&message as &dyn Value))];
let value_set = CALLSITE.metadata().fields().value_set(&values);

Event::dispatch(meta, &value_set);

// Wait a bit for everything to end (we don't want to remove the subscriber
// immediately because that will influence the test).
thread::sleep(Duration::from_millis(10));
})
.expect("failed to spawn thread")
}

/// Regression test for missing register_callsite call (#2743)
///
/// This test provokes the race condition which causes the second subscriber to not receive a
/// call to `register_callsite` before it receives a call to `event`.
///
/// Because the test depends on the interaction of multiple dispatchers in different threads,
/// it needs to be in a test file by itself.
#[test]
fn event_before_register() {
let subscriber_1_register_sleep_micros = 100;
let subscriber_2_register_sleep_micros = 0;

let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros);

// This delay ensures that the event callsite has interest() called first.
thread::sleep(Duration::from_micros(50));
let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros);

jh1.join().expect("failed to join thread");
jh2.join().expect("failed to join thread");
}
109 changes: 109 additions & 0 deletions tracing/tests/missed_register_callsite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
thread::{self, JoinHandle},
time::Duration,
};

use tracing::Subscriber;
use tracing_core::{span, Metadata};

struct TestSubscriber {
creator_thread: String,
sleep: Duration,
callsite: AtomicPtr<Metadata<'static>>,
}

impl TestSubscriber {
fn new(sleep_micros: u64) -> Self {
let creator_thread = thread::current()
.name()
.unwrap_or("<unknown thread>")
.to_owned();
Self {
creator_thread,
sleep: Duration::from_micros(sleep_micros),
callsite: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Subscriber for TestSubscriber {
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
if !self.sleep.is_zero() {
thread::sleep(self.sleep);
}

self.callsite
.store(metadata as *const _ as *mut _, Ordering::SeqCst);
println!(
"{creator} from {thread:?}: register_callsite: {callsite:#?}",
creator = self.creator_thread,
callsite = metadata as *const _,
thread = thread::current().name(),
);
tracing_core::Interest::always()
}

fn event(&self, event: &tracing_core::Event<'_>) {
let stored_callsite = self.callsite.load(Ordering::SeqCst);
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;

println!(
"{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})",
creator = self.creator_thread,
thread = thread::current().name(),
);

// This assert is the actual test.
assert_eq!(
stored_callsite, event_callsite,
"stored callsite: {stored_callsite:#?} does not match event \
callsite: {event_callsite:#?}. Was `event` called before \
`register_callsite`?"
);
}

fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(0)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn enter(&self, _span: &tracing_core::span::Id) {}
fn exit(&self, _span: &tracing_core::span::Id) {}
}

fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> {
thread::Builder::new()
.name(format!("subscriber-{idx}"))
.spawn(move || {
// We use a sleep to ensure the starting order of the 2 threads.
let subscriber = TestSubscriber::new(register_sleep_micros);
let _subscriber_guard = tracing::subscriber::set_default(subscriber);

tracing::info!("event-from-{idx}", idx = idx);

// Wait a bit for everything to end (we don't want to remove the subscriber
// immediately because that will mix up the test).
thread::sleep(Duration::from_millis(100));
})
.expect("failed to spawn thread")
}

#[test]
fn event_before_register() {
let subscriber_1_register_sleep_micros = 100;
let subscriber_2_register_sleep_micros = 0;

let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros);

// This delay ensures that the event!() in the first thread is executed first.
thread::sleep(Duration::from_micros(50));
let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros);

jh1.join().expect("failed to join thread");
jh2.join().expect("failed to join thread");
}

0 comments on commit 8a25a16

Please sign in to comment.