diff --git a/Cargo.toml b/Cargo.toml index d683b72..4edf89b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.19.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.20.0", default-features = false, features = ["trace"] } tracing = { version = "0.1.35", default-features = false, features = ["std"] } tracing-core = "0.1.28" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["registry", "std"] } @@ -39,7 +39,8 @@ thiserror = { version = "1.0.31", optional = true } [dev-dependencies] async-trait = "0.1.56" criterion = { version = "0.4.0", default-features = false, features = ["html_reports"] } -opentelemetry-jaeger = "0.18.0" +opentelemetry-jaeger = "0.19.0" +opentelemetry-stdout = { version = "0.1.0", features = ["trace"] } pprof = { version = "0.11.1", features = ["flamegraph", "criterion"] } futures-util = { version = "0.3", default-features = false } tokio = { version = "1", features = ["full"] } diff --git a/src/layer.rs b/src/layer.rs index f4a3057..768f463 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1160,10 +1160,10 @@ mod tests { fn trace_id_from_existing_context() { let tracer = TestTracer(Arc::new(Mutex::new(None))); let subscriber = tracing_subscriber::registry().with(layer().with_tracer(tracer.clone())); - let trace_id = otel::TraceId::from(42u128.to_be_bytes()); + let trace_id = otel::TraceId::from(42u128); let existing_cx = OtelContext::current_with_span(TestSpan(otel::SpanContext::new( trace_id, - otel::SpanId::from(1u64.to_be_bytes()), + otel::SpanId::from(1u64), TraceFlags::default(), false, Default::default(), diff --git a/src/lib.rs b/src/lib.rs index 82e11c0..3ed877a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,13 +51,17 @@ //! ## Examples //! //! ``` -//! use opentelemetry::sdk::export::trace::stdout; +//! use opentelemetry::sdk::trace::TracerProvider; +//! use opentelemetry::trace::{Tracer, TracerProvider as _}; //! use tracing::{error, span}; //! use tracing_subscriber::layer::SubscriberExt; //! use tracing_subscriber::Registry; //! -//! // Create a new OpenTelemetry pipeline -//! let tracer = stdout::new_pipeline().install_simple(); +//! // Create a new OpenTelemetry trace pipeline that prints to stdout +//! let provider = TracerProvider::builder() +//! .with_simple_exporter(opentelemetry_stdout::SpanExporter::default()) +//! .build(); +//! let tracer = provider.tracer("readme_example"); //! //! // Create a tracing layer with the configured tracer //! let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); diff --git a/src/metrics.rs b/src/metrics.rs index 91b9e63..da8d1ca 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2,11 +2,7 @@ use std::{collections::HashMap, fmt, sync::RwLock}; use tracing::{field::Visit, Subscriber}; use tracing_core::Field; -use opentelemetry::{ - metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, - sdk::metrics::controllers::BasicController, - Context as OtelContext, -}; +use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}; use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -44,7 +40,6 @@ pub(crate) enum InstrumentType { impl Instruments { pub(crate) fn update_metric( &self, - cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -78,7 +73,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(cx, value, &[]), + |ctr| ctr.add(value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -86,7 +81,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(cx, value, &[]), + |ctr| ctr.add(value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -94,7 +89,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(cx, value, &[]), + |ctr| ctr.add(value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -102,7 +97,7 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(cx, value, &[]), + |ctr| ctr.add(value, &[]), ); } InstrumentType::HistogramU64(value) => { @@ -110,7 +105,7 @@ impl Instruments { &self.u64_histogram, metric_name, || meter.u64_histogram(metric_name).init(), - |rec| rec.record(cx, value, &[]), + |rec| rec.record(value, &[]), ); } InstrumentType::HistogramI64(value) => { @@ -118,7 +113,7 @@ impl Instruments { &self.i64_histogram, metric_name, || meter.i64_histogram(metric_name).init(), - |rec| rec.record(cx, value, &[]), + |rec| rec.record(value, &[]), ); } InstrumentType::HistogramF64(value) => { @@ -126,7 +121,7 @@ impl Instruments { &self.f64_histogram, metric_name, || meter.f64_histogram(metric_name).init(), - |rec| rec.record(cx, value, &[]), + |rec| rec.record(value, &[]), ); } }; @@ -144,10 +139,8 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { - let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -155,7 +148,6 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( - &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -170,7 +162,6 @@ impl<'a> Visit for MetricVisitor<'a> { } } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::HistogramU64(value), metric_name, @@ -179,24 +170,20 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_f64(&mut self, field: &Field, value: f64) { - let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::HistogramF64(value), metric_name, @@ -205,24 +192,20 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_i64(&mut self, field: &Field, value: i64) { - let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( - &cx, self.meter, InstrumentType::HistogramI64(value), metric_name, @@ -246,14 +229,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsLayer; /// use tracing_subscriber::layer::SubscriberExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::controllers::BasicController; +/// # use opentelemetry::sdk::metrics::MeterProvider; /// -/// // Constructing a BasicController is out-of-scope for the docs here, but there +/// // Constructing a MeterProvider is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 -/// # let controller: BasicController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/dfeac078ff7853e7dc814778524b93470dfa5c9c/examples/metrics-basic/src/main.rs#L7 +/// # let meter_provider: MeterProvider = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsLayer::new(controller); +/// let opentelemetry_metrics = MetricsLayer::new(meter_provider); /// let subscriber = Registry::default().with(opentelemetry_metrics); /// tracing::subscriber::set_global_default(subscriber).unwrap(); /// ``` @@ -343,9 +326,16 @@ pub struct MetricsLayer { impl MetricsLayer { /// Create a new instance of MetricsLayer. - pub fn new(controller: BasicController) -> Self { - let meter = - controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); + pub fn new(meter_provider: M) -> Self + where + M: MeterProvider, + { + let meter = meter_provider.versioned_meter( + INSTRUMENTATION_LIBRARY_NAME, + Some(CARGO_PKG_VERSION), + None::<&'static str>, + None, + ); MetricsLayer { meter, instruments: Default::default(), diff --git a/src/tracer.rs b/src/tracer.rs index 66c52fe..adb5569 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -88,7 +88,6 @@ impl PreSampledTracer for Tracer { builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), - self.instrumentation_library(), )); process_sampling_result( @@ -169,7 +168,7 @@ mod tests { let provider = TracerProvider::default(); let tracer = provider.tracer("test"); let mut builder = SpanBuilder::from_name("empty".to_string()); - builder.span_id = Some(SpanId::from(1u64.to_be_bytes())); + builder.span_id = Some(SpanId::from(1u64)); builder.trace_id = None; let parent_cx = OtelContext::new(); let cx = tracer.sampled_context(&mut OtelData { builder, parent_cx }); @@ -224,8 +223,8 @@ mod tests { fn span_context(trace_flags: TraceFlags, is_remote: bool) -> SpanContext { SpanContext::new( - TraceId::from(1u128.to_be_bytes()), - SpanId::from(1u64.to_be_bytes()), + TraceId::from(1u128), + SpanId::from(1u64), trace_flags, is_remote, Default::default(), diff --git a/tests/metrics_publishing.rs b/tests/metrics_publishing.rs index 7616b8c..faad19e 100644 --- a/tests/metrics_publishing.rs +++ b/tests/metrics_publishing.rs @@ -1,21 +1,19 @@ use opentelemetry::{ metrics::MetricsError, sdk::{ - export::metrics::{ - aggregation::{self, Histogram, Sum, TemporalitySelector}, - InstrumentationLibraryReader, - }, metrics::{ - aggregators::{HistogramAggregator, SumAggregator}, - controllers::BasicController, - processors, - sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, - selectors, + data::{Histogram, ResourceMetrics, Sum}, + reader::{ + AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector, + MetricReader, TemporalitySelector, + }, + InstrumentKind, ManualReader, MeterProvider, }, + Resource, }, Context, }; -use std::cmp::Ordering; +use std::{fmt::Debug, sync::Arc}; use tracing::Subscriber; use tracing_opentelemetry::MetricsLayer; use tracing_subscriber::prelude::*; @@ -25,12 +23,8 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let (subscriber, exporter) = init_subscriber( - "hello_world".to_string(), - InstrumentKind::Counter, - NumberKind::U64, - Number::from(1_u64), - ); + let (subscriber, exporter) = + init_subscriber("hello_world".to_string(), InstrumentKind::Counter, 1_u64); tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); @@ -41,12 +35,8 @@ async fn u64_counter_is_exported() { #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let (subscriber, exporter) = init_subscriber( - "hello_world2".to_string(), - InstrumentKind::Counter, - NumberKind::U64, - Number::from(1_u64), - ); + let (subscriber, exporter) = + init_subscriber("hello_world2".to_string(), InstrumentKind::Counter, 1_u64); tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); @@ -60,8 +50,7 @@ async fn f64_counter_is_exported() { let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, - NumberKind::F64, - Number::from(1.000000123_f64), + 1.000000123_f64, ); tracing::subscriber::with_default(subscriber, || { @@ -73,12 +62,8 @@ async fn f64_counter_is_exported() { #[tokio::test] async fn i64_up_down_counter_is_exported() { - let (subscriber, exporter) = init_subscriber( - "pebcak".to_string(), - InstrumentKind::UpDownCounter, - NumberKind::I64, - Number::from(-5_i64), - ); + let (subscriber, exporter) = + init_subscriber("pebcak".to_string(), InstrumentKind::UpDownCounter, -5_i64); tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); @@ -89,12 +74,8 @@ async fn i64_up_down_counter_is_exported() { #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let (subscriber, exporter) = init_subscriber( - "pebcak2".to_string(), - InstrumentKind::UpDownCounter, - NumberKind::I64, - Number::from(5_i64), - ); + let (subscriber, exporter) = + init_subscriber("pebcak2".to_string(), InstrumentKind::UpDownCounter, 5_i64); tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); @@ -108,8 +89,7 @@ async fn f64_up_down_counter_is_exported() { let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, - NumberKind::F64, - Number::from(99.123_f64), + 99.123_f64, ); tracing::subscriber::with_default(subscriber, || { @@ -121,15 +101,11 @@ async fn f64_up_down_counter_is_exported() { #[tokio::test] async fn u64_histogram_is_exported() { - let (subscriber, exporter) = init_subscriber( - "abcdefg".to_string(), - InstrumentKind::Histogram, - NumberKind::U64, - Number::from(9_u64), - ); + let (subscriber, exporter) = + init_subscriber("abcdefg".to_string(), InstrumentKind::Histogram, 9_u64); tracing::subscriber::with_default(subscriber, || { - tracing::info!(value.abcdefg = 9_u64); + tracing::info!(histogram.abcdefg = 9_u64); }); exporter.export().unwrap(); @@ -140,12 +116,11 @@ async fn i64_histogram_is_exported() { let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), InstrumentKind::Histogram, - NumberKind::I64, - Number::from(-19_i64), + -19_i64, ); tracing::subscriber::with_default(subscriber, || { - tracing::info!(value.abcdefg_auenatsou = -19_i64); + tracing::info!(histogram.abcdefg_auenatsou = -19_i64); }); exporter.export().unwrap(); @@ -156,126 +131,145 @@ async fn f64_histogram_is_exported() { let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), InstrumentKind::Histogram, - NumberKind::F64, - Number::from(777.0012_f64), + 777.0012_f64, ); tracing::subscriber::with_default(subscriber, || { - tracing::info!(value.abcdefg_racecar = 777.0012_f64); + tracing::info!(histogram.abcdefg_racecar = 777.0012_f64); }); exporter.export().unwrap(); } -fn init_subscriber( +fn init_subscriber( expected_metric_name: String, expected_instrument_kind: InstrumentKind, - expected_number_kind: NumberKind, - expected_value: Number, -) -> (impl Subscriber + 'static, TestExporter) { - let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( - selectors::simple::histogram(vec![-10.0, 100.0]), - aggregation::cumulative_temporality_selector(), - )) - .build(); + expected_value: T, +) -> (impl Subscriber + 'static, TestExporter) { + let reader = ManualReader::builder() + .with_aggregation_selector(Box::new(DefaultAggregationSelector::new())) + .with_temporality_selector(DefaultTemporalitySelector::new()) + .build(); + let reader = TestReader { + inner: Arc::new(reader), + }; + + let provider = MeterProvider::builder().with_reader(reader.clone()).build(); let exporter = TestExporter { expected_metric_name, expected_instrument_kind, - expected_number_kind, expected_value, - controller: controller.clone(), + reader, + _meter_provider: provider.clone(), }; ( - tracing_subscriber::registry().with(MetricsLayer::new(controller)), + tracing_subscriber::registry().with(MetricsLayer::new(provider)), exporter, ) } -#[derive(Clone, Debug)] -struct TestExporter { +#[derive(Debug, Clone)] +struct TestReader { + inner: Arc, +} + +impl AggregationSelector for TestReader { + fn aggregation(&self, kind: InstrumentKind) -> opentelemetry::sdk::metrics::Aggregation { + self.inner.aggregation(kind) + } +} + +impl TemporalitySelector for TestReader { + fn temporality(&self, kind: InstrumentKind) -> opentelemetry::sdk::metrics::data::Temporality { + self.inner.temporality(kind) + } +} + +impl MetricReader for TestReader { + fn register_pipeline(&self, pipeline: std::sync::Weak) { + self.inner.register_pipeline(pipeline); + } + + fn register_producer( + &self, + producer: Box, + ) { + self.inner.register_producer(producer); + } + + fn collect( + &self, + rm: &mut opentelemetry::sdk::metrics::data::ResourceMetrics, + ) -> opentelemetry::metrics::Result<()> { + self.inner.collect(rm) + } + + fn force_flush(&self, cx: &Context) -> opentelemetry::metrics::Result<()> { + self.inner.force_flush(cx) + } + + fn shutdown(&self) -> opentelemetry::metrics::Result<()> { + self.inner.shutdown() + } +} + +struct TestExporter { expected_metric_name: String, expected_instrument_kind: InstrumentKind, - expected_number_kind: NumberKind, - expected_value: Number, - controller: BasicController, + expected_value: T, + reader: TestReader, + _meter_provider: MeterProvider, } -impl TestExporter { +impl TestExporter +where + T: Debug + PartialEq + Copy + std::iter::Sum + 'static, +{ fn export(&self) -> Result<(), MetricsError> { - self.controller.collect(&Context::current())?; - self.controller.try_for_each(&mut |library, reader| { - reader.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); + let mut rm = ResourceMetrics { + resource: Resource::default(), + scope_metrics: Vec::new(), + }; + self.reader.collect(&mut rm)?; + + assert!(!rm.scope_metrics.is_empty()); + + rm.scope_metrics.into_iter().for_each(|scope_metrics| { + assert_eq!(scope_metrics.scope.name, INSTRUMENTATION_LIBRARY_NAME); + assert_eq!( + scope_metrics.scope.version.unwrap().as_ref(), + CARGO_PKG_VERSION + ); + + scope_metrics.metrics.into_iter().for_each(|metric| { + assert_eq!(metric.name, self.expected_metric_name); + match self.expected_instrument_kind { InstrumentKind::Counter | InstrumentKind::UpDownCounter => { - let number = record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(); - + let sum = metric.data.as_any().downcast_ref::>().unwrap(); assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() + self.expected_value, + sum.data_points + .iter() + .map(|data_point| data_point.value) + .sum() ); } InstrumentKind::Histogram => { - let histogram = record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .histogram() - .unwrap(); - - let counts = histogram.counts(); - if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { - assert_eq!(counts, &[0.0, 0.0, 1.0]); - } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { - assert_eq!(counts, &[0.0, 1.0, 0.0]); - } else { - assert_eq!(counts, &[1.0, 0.0, 0.0]); - } + let histogram = + metric.data.as_any().downcast_ref::>().unwrap(); + let histogram_data = histogram.data_points.first().unwrap(); + assert!(histogram_data.count > 0); + assert_eq!(histogram_data.sum, self.expected_value); } - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - - // The following are the same regardless of the individual metric. - assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); - assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); - - Ok(()) - }) - }) - } -} + unexpected => { + panic!("InstrumentKind {:?} not currently supported!", unexpected) + } + } + }); + }); -impl TemporalitySelector for TestExporter { - fn temporality_for( - &self, - _descriptor: &Descriptor, - _kind: &aggregation::AggregationKind, - ) -> aggregation::Temporality { - // I don't think the value here makes a difference since - // we are just testing a single metric. - aggregation::Temporality::Cumulative + Ok(()) } }