Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use regular lock for simplespanprocessor #1612

Merged
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)

## v0.22.1

### Fixed
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug {
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
}

/// A [`LogProcessor`] that exports synchronously when logs are emitted.
///
/// # Examples
///
/// Note that the simple processor exports synchronously every time a log is
/// emitted. If you find this limiting, consider the batch processor instead.
/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
/// as they are emitted, without any batching. This is typically useful for
/// debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchLogProcessor].
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
Expand Down
65 changes: 33 additions & 32 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use crate::{
InstrumentationLibrary,
};
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
sync::{Arc, Mutex},
};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
Expand All @@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TestSpanExporter {
tx_export: Sender<SpanData>,
tx_shutdown: Sender<()>,
pub export_called: Arc<Mutex<bool>>,
pub shutdown_called: Arc<Mutex<bool>>,
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for TestSpanExporter {
fn default() -> Self {
Self::new()
}
}

impl TestSpanExporter {
pub fn new() -> Self {
TestSpanExporter {
export_called: Arc::new(Mutex::new(false)),
shutdown_called: Arc::new(Mutex::new(false)),
}
}

pub fn is_export_called(&self) -> bool {
*self.export_called.lock().unwrap()
}

pub fn is_shutdown_called(&self) -> bool {
*self.shutdown_called.lock().unwrap()
}
}

#[async_trait]
impl SpanExporter for TestSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
}
}
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
*self.export_called.lock().unwrap() = true;
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
let _ = self.tx_shutdown.send(()); // ignore error
*self.shutdown_called.lock().unwrap() = true;
}
}

pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
let (tx_export, rx_export) = unbounded();
let (tx_shutdown, rx_shutdown) = unbounded();
let exporter = TestSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
Expand Down Expand Up @@ -139,12 +146,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for TestExportError {
}
}

impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
fn from(err: SendError<T>) -> Self {
TestExportError(err.to_string())
}
}

/// A no-op instance of an [`SpanExporter`].
///
/// [`SpanExporter`]: crate::export::trace::SpanExporter
Expand Down
125 changes: 41 additions & 84 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ use opentelemetry::{
Context,
};
use std::cmp::min;
use std::{env, fmt, str::FromStr, thread, time::Duration};
use std::sync::Mutex;
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn shutdown(&mut self) -> TraceResult<()>;
}

/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as
/// soon as they are finished, without any batching.
/// A [SpanProcessor] that passes finished spans to the configured
/// `SpanExporter`, as soon as they are finished, without any batching. This is
/// typically useful for debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchSpanProcessor].
#[derive(Debug)]
pub struct SimpleSpanProcessor {
message_sender: crossbeam_channel::Sender<Message>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();
});

Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));
}
}
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
Expand All @@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor {
return;
}

if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);
}
}

fn force_flush(&self) -> TraceResult<()> {
self.signal(Message::Flush, "flushing");

// Nothing to flush for simple span processor.
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
self.signal(Message::Shutdown, "shutting down");

Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
// Expecting to address that separately in the future."")
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down Expand Up @@ -707,6 +662,7 @@ where

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
// cargo test trace::span_processor::tests:: --features=trace,testing
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
Expand All @@ -715,7 +671,7 @@ mod tests {
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
};
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
Expand All @@ -729,17 +685,17 @@ mod tests {

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
assert!(exporter.is_export_called());
let _result = processor.shutdown();
}

#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
Expand All @@ -756,15 +712,16 @@ mod tests {
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
assert!(!exporter.is_export_called());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
assert!(!exporter.is_shutdown_called());
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
assert!(exporter.is_shutdown_called());
}

#[test]
Expand Down Expand Up @@ -863,7 +820,7 @@ mod tests {
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
Expand All @@ -883,7 +840,7 @@ mod tests {
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
Expand Down
Loading