From d5bf258cc83ce39c1db55a41618a8f202bb8ad1c Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 22:43:35 -0700 Subject: [PATCH] Use regular lock for simplespanprocessor (#1612) --- opentelemetry-sdk/CHANGELOG.md | 4 + opentelemetry-sdk/Cargo.toml | 5 +- opentelemetry-sdk/src/logs/log_processor.rs | 10 +- .../src/testing/trace/span_exporters.rs | 65 ++++----- opentelemetry-sdk/src/trace/span_processor.rs | 125 ++++++------------ 5 files changed, 84 insertions(+), 125 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 61d2dbb0b7..f5880ce3ae 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed + dependency on crossbeam-channel. + [1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files) + ## v0.22.1 ### Fixed diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cc6f010683..cabcc84476 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" } opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true } async-std = { workspace = true, features = ["unstable"], optional = true } async-trait = { workspace = true, optional = true } -crossbeam-channel = { version = "0.5", optional = true } futures-channel = "0.3" futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } [features] default = ["trace"] -trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"] +trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] -logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"] +logs = ["opentelemetry/logs", "async-trait", "serde_json"] logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index bda7730283..57ace8dad6 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug { fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; } -/// A [`LogProcessor`] that exports synchronously when logs are emitted. -/// -/// # Examples -/// -/// Note that the simple processor exports synchronously every time a log is -/// emitted. If you find this limiting, consider the batch processor instead. +/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon +/// as they are emitted, without any batching. This is typically useful for +/// debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchLogProcessor]. #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>, diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 92666e229f..aa5ec2d651 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -7,13 +7,15 @@ use crate::{ InstrumentationLibrary, }; use async_trait::async_trait; -use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; -use std::fmt::{Display, Formatter}; +use std::{ + fmt::{Display, Formatter}, + sync::{Arc, Mutex}, +}; pub fn new_test_export_span_data() -> SpanData { let config = Config::default(); @@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TestSpanExporter { - tx_export: Sender, - tx_shutdown: Sender<()>, + pub export_called: Arc>, + pub shutdown_called: Arc>, +} + +impl Default for TestSpanExporter { + fn default() -> Self { + Self::new() + } +} + +impl TestSpanExporter { + pub fn new() -> Self { + TestSpanExporter { + export_called: Arc::new(Mutex::new(false)), + shutdown_called: Arc::new(Mutex::new(false)), + } + } + + pub fn is_export_called(&self) -> bool { + *self.export_called.lock().unwrap() + } + + pub fn is_shutdown_called(&self) -> bool { + *self.shutdown_called.lock().unwrap() + } } #[async_trait] impl SpanExporter for TestSpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - for span_data in batch { - if let Err(err) = self - .tx_export - .send(span_data) - .map_err::(Into::into) - { - return Box::pin(std::future::ready(Err(Into::into(err)))); - } - } + fn export(&mut self, _batch: Vec) -> BoxFuture<'static, ExportResult> { + *self.export_called.lock().unwrap() = true; Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { - let _ = self.tx_shutdown.send(()); // ignore error + *self.shutdown_called.lock().unwrap() = true; } } -pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { - let (tx_export, rx_export) = unbounded(); - let (tx_shutdown, rx_shutdown) = unbounded(); - let exporter = TestSpanExporter { - tx_export, - tx_shutdown, - }; - (exporter, rx_export, rx_shutdown) -} - #[derive(Debug)] pub struct TokioSpanExporter { tx_export: tokio::sync::mpsc::UnboundedSender, @@ -139,12 +146,6 @@ impl From> for TestExportError { } } -impl From> for TestExportError { - fn from(err: SendError) -> Self { - TestExportError(err.to_string()) - } -} - /// A no-op instance of an [`SpanExporter`]. /// /// [`SpanExporter`]: crate::export::trace::SpanExporter diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index c9230a1603..1f8ec47c03 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -50,7 +50,8 @@ use opentelemetry::{ Context, }; use std::cmp::min; -use std::{env, fmt, str::FromStr, thread, time::Duration}; +use std::sync::Mutex; +use std::{env, fmt, str::FromStr, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn shutdown(&mut self) -> TraceResult<()>; } -/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as -/// soon as they are finished, without any batching. +/// A [SpanProcessor] that passes finished spans to the configured +/// `SpanExporter`, as soon as they are finished, without any batching. This is +/// typically useful for debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchSpanProcessor]. #[derive(Debug)] pub struct SimpleSpanProcessor { - message_sender: crossbeam_channel::Sender, + exporter: Mutex>, } impl SimpleSpanProcessor { - pub(crate) fn new(mut exporter: Box) -> Self { - let (message_sender, rx) = crossbeam_channel::unbounded(); - - let _ = thread::Builder::new() - .name("opentelemetry-exporter".to_string()) - .spawn(move || { - while let Ok(msg) = rx.recv() { - match msg { - Message::ExportSpan(span) => { - if let Err(err) = - futures_executor::block_on(exporter.export(vec![span])) - { - global::handle_error(err); - } - } - Message::Flush(sender) => { - Self::respond(&sender, "sync"); - } - Message::Shutdown(sender) => { - exporter.shutdown(); - - Self::respond(&sender, "shutdown"); - - return; - } - } - } - - exporter.shutdown(); - }); - - Self { message_sender } - } - - fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) { - let (tx, rx) = crossbeam_channel::bounded(0); - - if self.message_sender.send(msg(tx)).is_ok() { - if let Err(err) = rx.recv() { - global::handle_error(TraceError::from(format!( - "error {description} span processor: {err:?}" - ))); - } - } - } - - fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) { - if let Err(err) = sender.send(()) { - global::handle_error(TraceError::from(format!( - "could not send {description}: {err:?}" - ))); + pub(crate) fn new(exporter: Box) -> Self { + Self { + exporter: Mutex::new(exporter), } } } @@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor { return; } - if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) { - global::handle_error(TraceError::from(format!("error processing span {:?}", err))); + let result = self + .exporter + .lock() + .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into())) + .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span]))); + + if let Err(err) = result { + global::handle_error(err); } } fn force_flush(&self) -> TraceResult<()> { - self.signal(Message::Flush, "flushing"); - + // Nothing to flush for simple span processor. Ok(()) } fn shutdown(&mut self) -> TraceResult<()> { - self.signal(Message::Shutdown, "shutting down"); - - Ok(()) + if let Ok(mut exporter) = self.exporter.lock() { + exporter.shutdown(); + Ok(()) + } else { + Err(TraceError::Other( + "SimpleSpanProcessor mutex poison at shutdown".into(), + )) + } } } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning. -// Expecting to address that separately in the future."") -enum Message { - ExportSpan(SpanData), - Flush(crossbeam_channel::Sender<()>), - Shutdown(crossbeam_channel::Sender<()>), -} - /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. /// @@ -707,6 +662,7 @@ where #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { + // cargo test trace::span_processor::tests:: --features=trace,testing use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, @@ -715,7 +671,7 @@ mod tests { use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::runtime; use crate::testing::trace::{ - new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, + new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter, }; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -729,17 +685,17 @@ mod tests { #[test] fn simple_span_processor_on_end_calls_export() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); processor.on_end(new_test_export_span_data()); - assert!(rx_export.recv().is_ok()); + assert!(exporter.is_export_called()); let _result = processor.shutdown(); } #[test] fn simple_span_processor_on_end_skips_export_if_not_sampled() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); let unsampled = SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::INVALID, @@ -756,15 +712,16 @@ mod tests { instrumentation_lib: Default::default(), }; processor.on_end(unsampled); - assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err()); + assert!(!exporter.is_export_called()); } #[test] fn simple_span_processor_shutdown_calls_shutdown() { - let (exporter, _rx_export, rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + assert!(!exporter.is_shutdown_called()); let _result = processor.shutdown(); - assert!(rx_shutdown.try_recv().is_ok()); + assert!(exporter.is_shutdown_called()); } #[test] @@ -863,7 +820,7 @@ mod tests { (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -883,7 +840,7 @@ mod tests { env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); });