Skip to content

Commit

Permalink
PeriodicReader::build interval registration in same thread/task
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored and Mindaugas Vinkelis committed Sep 20, 2024
1 parent 3347bde commit dcc3b5b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
43 changes: 21 additions & 22 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,26 @@ where
let (message_sender, message_receiver) = mpsc::channel(256);

let worker = move |reader: &PeriodicReader| {
let ticker = self
.runtime
.interval(self.interval)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| Message::Export);

let messages = Box::pin(stream::select(message_receiver, ticker));

let runtime = self.runtime.clone();
self.runtime.spawn(Box::pin(
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
let ticker = runtime
.interval(self.interval)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
PeriodicReaderWorker {
reader: reader.clone(),
reader,
timeout: self.timeout,
runtime,
rm: ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
},
}
.run(messages),
));
.run(messages)
.await
}));
};

PeriodicReader {
Expand Down Expand Up @@ -378,15 +377,15 @@ mod tests {
metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider,
runtime, testing::metrics::InMemoryMetricsExporter, Resource,
};
use opentelemetry::metrics::MeterProvider;
use opentelemetry::metrics::{MeterProvider, MetricsError};
use std::sync::mpsc;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn registration_triggers_collection() {
#[test]
fn collection_triggered_by_interval() {
// Arrange
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
let reader = PeriodicReader::builder(exporter.clone(), runtime::TokioCurrentThread)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();
Expand All @@ -401,16 +400,14 @@ mod tests {
})
.init();

_ = meter_provider.force_flush();

// Assert
receiver
.try_recv()
.recv()
.expect("message should be available in channel, indicating a collection occurred");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn unregistered_collect() {
#[test]
fn unregistered_collect() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
Expand All @@ -423,6 +420,8 @@ mod tests {
let result = reader.collect(&mut rm);

// Assert
result.expect_err("error expected when reader is not registered");
assert!(
matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered")
);
}
}
2 changes: 0 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ fn aggregate_fn<T: Number>(
agg: &aggregation::Aggregation,
kind: InstrumentKind,
) -> Result<Option<AggregateFns<T>>> {
use aggregation::Aggregation;
fn box_val<T>(
(m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
) -> (
Expand Down Expand Up @@ -544,7 +543,6 @@ fn aggregate_fn<T: Number>(
/// | Gauge | ✓ | ✓ | | ✓ | ✓ |
/// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |
fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
use aggregation::Aggregation;
match agg {
Aggregation::Default => Ok(()),
Aggregation::ExplicitBucketHistogram { .. }
Expand Down

0 comments on commit dcc3b5b

Please sign in to comment.