diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 7821786675..c07200b274 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -114,18 +114,16 @@ where let (message_sender, message_receiver) = mpsc::channel(256); let worker = move |reader: &PeriodicReader| { - let ticker = self - .runtime - .interval(self.interval) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| Message::Export); - - let messages = Box::pin(stream::select(message_receiver, ticker)); - let runtime = self.runtime.clone(); - self.runtime.spawn(Box::pin( + let reader = reader.clone(); + self.runtime.spawn(Box::pin(async move { + let ticker = runtime + .interval(self.interval) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| Message::Export); + let messages = Box::pin(stream::select(message_receiver, ticker)); PeriodicReaderWorker { - reader: reader.clone(), + reader, timeout: self.timeout, runtime, rm: ResourceMetrics { @@ -133,8 +131,9 @@ where scope_metrics: Vec::new(), }, } - .run(messages), - )); + .run(messages) + .await + })); }; PeriodicReader { @@ -378,15 +377,15 @@ mod tests { metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider, runtime, testing::metrics::InMemoryMetricsExporter, Resource, }; - use opentelemetry::metrics::MeterProvider; + use opentelemetry::metrics::{MeterProvider, MetricsError}; use std::sync::mpsc; - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn registration_triggers_collection() { + #[test] + fn collection_triggered_by_interval() { // Arrange let interval = std::time::Duration::from_millis(1); let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio) + let reader = PeriodicReader::builder(exporter.clone(), runtime::TokioCurrentThread) .with_interval(interval) .build(); let (sender, receiver) = mpsc::channel(); @@ -401,16 +400,14 @@ mod tests { }) .init(); - _ = meter_provider.force_flush(); - // Assert receiver - .try_recv() + .recv() .expect("message should be available in channel, indicating a collection occurred"); } - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn unregistered_collect() { + #[test] + fn unregistered_collect() { // Arrange let exporter = InMemoryMetricsExporter::default(); let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); @@ -423,6 +420,8 @@ mod tests { let result = reader.collect(&mut rm); // Assert - result.expect_err("error expected when reader is not registered"); + assert!( + matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered") + ); } } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 6e0f139809..c4638eb28c 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -470,7 +470,6 @@ fn aggregate_fn( agg: &aggregation::Aggregation, kind: InstrumentKind, ) -> Result>> { - use aggregation::Aggregation; fn box_val( (m, ca): (impl internal::Measure, impl internal::ComputeAggregation), ) -> ( @@ -544,7 +543,6 @@ fn aggregate_fn( /// | Gauge | ✓ | ✓ | | ✓ | ✓ | /// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ | fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> { - use aggregation::Aggregation; match agg { Aggregation::Default => Ok(()), Aggregation::ExplicitBucketHistogram { .. }