Skip to content

Commit

Permalink
Use regular lock for simplespanprocessor (open-telemetry#1612)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored and sreeo committed Mar 12, 2024
1 parent 65c3656 commit 78399b4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 125 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)

## v0.22.1

### Fixed
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug {
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
}

/// A [`LogProcessor`] that exports synchronously when logs are emitted.
///
/// # Examples
///
/// Note that the simple processor exports synchronously every time a log is
/// emitted. If you find this limiting, consider the batch processor instead.
/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
/// as they are emitted, without any batching. This is typically useful for
/// debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchLogProcessor].
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
Expand Down
65 changes: 33 additions & 32 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use crate::{
InstrumentationLibrary,
};
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
sync::{Arc, Mutex},
};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
Expand All @@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TestSpanExporter {
tx_export: Sender<SpanData>,
tx_shutdown: Sender<()>,
pub export_called: Arc<Mutex<bool>>,
pub shutdown_called: Arc<Mutex<bool>>,
}

impl Default for TestSpanExporter {
fn default() -> Self {
Self::new()
}
}

impl TestSpanExporter {
pub fn new() -> Self {
TestSpanExporter {
export_called: Arc::new(Mutex::new(false)),
shutdown_called: Arc::new(Mutex::new(false)),
}
}

pub fn is_export_called(&self) -> bool {
*self.export_called.lock().unwrap()
}

pub fn is_shutdown_called(&self) -> bool {
*self.shutdown_called.lock().unwrap()
}
}

#[async_trait]
impl SpanExporter for TestSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
}
}
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
*self.export_called.lock().unwrap() = true;
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
let _ = self.tx_shutdown.send(()); // ignore error
*self.shutdown_called.lock().unwrap() = true;
}
}

pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
let (tx_export, rx_export) = unbounded();
let (tx_shutdown, rx_shutdown) = unbounded();
let exporter = TestSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
Expand Down Expand Up @@ -139,12 +146,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for TestExportError {
}
}

impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
fn from(err: SendError<T>) -> Self {
TestExportError(err.to_string())
}
}

/// A no-op instance of an [`SpanExporter`].
///
/// [`SpanExporter`]: crate::export::trace::SpanExporter
Expand Down
125 changes: 41 additions & 84 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ use opentelemetry::{
Context,
};
use std::cmp::min;
use std::{env, fmt, str::FromStr, thread, time::Duration};
use std::sync::Mutex;
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn shutdown(&mut self) -> TraceResult<()>;
}

/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as
/// soon as they are finished, without any batching.
/// A [SpanProcessor] that passes finished spans to the configured
/// `SpanExporter`, as soon as they are finished, without any batching. This is
/// typically useful for debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchSpanProcessor].
#[derive(Debug)]
pub struct SimpleSpanProcessor {
message_sender: crossbeam_channel::Sender<Message>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();
});

Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));
}
}
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
Expand All @@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor {
return;
}

if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);
}
}

fn force_flush(&self) -> TraceResult<()> {
self.signal(Message::Flush, "flushing");

// Nothing to flush for simple span processor.
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
self.signal(Message::Shutdown, "shutting down");

Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
// Expecting to address that separately in the future."")
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down Expand Up @@ -707,6 +662,7 @@ where

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
// cargo test trace::span_processor::tests:: --features=trace,testing
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
Expand All @@ -715,7 +671,7 @@ mod tests {
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
};
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
Expand All @@ -729,17 +685,17 @@ mod tests {

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
assert!(exporter.is_export_called());
let _result = processor.shutdown();
}

#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
Expand All @@ -756,15 +712,16 @@ mod tests {
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
assert!(!exporter.is_export_called());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
assert!(!exporter.is_shutdown_called());
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
assert!(exporter.is_shutdown_called());
}

#[test]
Expand Down Expand Up @@ -863,7 +820,7 @@ mod tests {
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
Expand All @@ -883,7 +840,7 @@ mod tests {
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
Expand Down

0 comments on commit 78399b4

Please sign in to comment.