Skip to content

Commit

Permalink
fix: process parallel events and potential deadlock in on_record (#95)
Browse files Browse the repository at this point in the history
## Motivation

#94 - Mutli-threaded tracing drops most of the events

## Solution

I have reorganized the code so that the lock on extensions is held, but
not while `record` is called which should solve the deadlock originally
solved in #59

### Benchmark

I've used the benchmark from #93, please ignore the "filtered" vs
"non_filtered" distinction, for the purpose of this PR they should be
identical cases.

See results in later comment as the code got changed.
  • Loading branch information
mladedav authored Feb 26, 2024
1 parent 101dc89 commit 4f747a0
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ futures-util = { version = "0.3", default-features = false }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tracing = { version = "0.1.35", default-features = false, features = ["std", "attributes"] }
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["registry", "std", "fmt"] }

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
Expand Down
131 changes: 126 additions & 5 deletions benches/trace.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::{
trace::{SpanBuilder, Tracer as _, TracerProvider as _},
trace::{Span, SpanBuilder, Tracer as _, TracerProvider as _},
Context,
};
use opentelemetry_sdk::trace::{Tracer, TracerProvider};
use opentelemetry_sdk::trace::{Config, SpanLimits, Tracer, TracerProvider};
#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};
use std::time::SystemTime;
use tracing::trace_span;
use tracing::{trace, trace_span};
use tracing_subscriber::prelude::*;

fn many_children(c: &mut Criterion) {
Expand Down Expand Up @@ -55,6 +55,73 @@ fn many_children(c: &mut Criterion) {
}
}

fn many_events(c: &mut Criterion) {
let mut group = c.benchmark_group("otel_many_events");

group.bench_function("spec_baseline", |b| {
let provider = TracerProvider::default();
let tracer = provider.tracer("bench");
b.iter(|| {
fn dummy(tracer: &Tracer, cx: &Context) {
let mut span = tracer.start_with_context("child", cx);
for _ in 0..1000 {
span.add_event("name", Vec::new());
}
}

tracer.in_span("parent", |cx| dummy(&tracer, &cx));
});
});

{
let _subscriber = tracing_subscriber::registry()
.with(RegistryAccessLayer)
.set_default();
group.bench_function("no_data_baseline", |b| b.iter(events_harness));
}

{
let _subscriber = tracing_subscriber::registry()
.with(OtelDataLayer)
.set_default();
group.bench_function("data_only_baseline", |b| b.iter(events_harness));
}

{
let provider = TracerProvider::default();
let tracer = provider.tracer("bench");
let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_tracked_inactivity(false);
let _subscriber = tracing_subscriber::registry()
.with(otel_layer)
.set_default();

group.bench_function("full_filtered", |b| b.iter(events_harness));
}

{
let provider = TracerProvider::builder()
.with_config(Config {
span_limits: SpanLimits {
max_events_per_span: 1000,
..SpanLimits::default()
},
..Config::default()
})
.build();
let tracer = provider.tracer("bench");
let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_tracked_inactivity(false);
let _subscriber = tracing_subscriber::registry()
.with(otel_layer)
.set_default();

group.bench_function("full_not_filtered", |b| b.iter(events_harness));
}
}

struct NoDataSpan;
struct RegistryAccessLayer;

Expand All @@ -73,6 +140,23 @@ where
extensions.insert(NoDataSpan);
}

fn on_event(
&self,
event: &tracing_core::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let Some(parent) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
event
.is_contextual()
.then(|| ctx.lookup_current())
.flatten()
}) else {
return;
};
let mut extensions = parent.extensions_mut();
extensions.get_mut::<NoDataSpan>();
}

fn on_close(&self, id: tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
let span = ctx.span(&id).expect("Span not found, this is a bug");
let mut extensions = span.extensions_mut();
Expand Down Expand Up @@ -100,6 +184,29 @@ where
);
}

fn on_event(
&self,
event: &tracing_core::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let Some(parent) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
event
.is_contextual()
.then(|| ctx.lookup_current())
.flatten()
}) else {
return;
};
let mut extensions = parent.extensions_mut();
let builder = extensions
.get_mut::<SpanBuilder>()
.expect("Builder not found in span, this is a bug");
let events = builder.events.get_or_insert_with(Vec::new);
let otel_event =
opentelemetry::trace::Event::new(String::new(), SystemTime::now(), Vec::new(), 0);
events.push(otel_event);
}

fn on_close(&self, id: tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
let span = ctx.span(&id).expect("Span not found, this is a bug");
let mut extensions = span.extensions_mut();
Expand All @@ -124,16 +231,30 @@ fn tracing_harness() {
dummy();
}

fn events_harness() {
fn dummy() {
let _child = trace_span!("child").entered();
for _ in 0..1000 {
trace!("event");
}
}

let parent = trace_span!("parent");
let _enter = parent.enter();

dummy();
}

#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = many_children
targets = many_children, many_events
}
#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default();
targets = many_children
targets = many_children, many_events
}
criterion_main!(benches);
Loading

0 comments on commit 4f747a0

Please sign in to comment.