Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global Log handler cleanup - Logs SDK #2184

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_debug,
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand All @@ -24,15 +23,14 @@
inner: Arc::new(LoggerProviderInner {
processors: Vec::new(),
resource: Resource::empty(),
is_shutdown: AtomicBool::new(true),

Check warning on line 26 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L26

Added line #L26 was not covered by tests
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

#[derive(Debug, Clone)]
/// Creator for `Logger` instances.
pub struct LoggerProvider {
inner: Arc<LoggerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

/// Default logger name if empty string is provided.
Expand Down Expand Up @@ -73,7 +71,7 @@

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down Expand Up @@ -105,6 +103,7 @@
/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> LogResult<()> {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
Expand All @@ -114,24 +113,36 @@
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", err)
);
match err {

Check warning on line 116 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L116

Added line #L116 was not covered by tests
// Specific handling for mutex poisoning
LogError::MutexPoisoned(_) => {
otel_debug!(
name: "LoggerProvider.Shutdown.MutexPoisoned",
);

Check warning on line 121 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L119-L121

Added lines #L119 - L121 were not covered by tests
}
_ => {
otel_debug!(

Check warning on line 124 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L124

Added line #L124 was not covered by tests
name: "LoggerProvider.Shutdown.Error",
error = format!("{err}")

Check warning on line 126 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L126

Added line #L126 was not covered by tests
);
}
}
errs.push(err);
}
}

if errs.is_empty() {
Ok(())
} else {
// consolidate errors from all the processors - not all may be user errors
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
otel_warn!(
name: "logger_provider_already_shutdown"
let error = LogError::AlreadyShutdown("LoggerProvider".to_string());
otel_debug!(
name: "LoggerProvider.Shutdown.AlreadyShutdown",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks redundant, as returned Result already contains this information. If we need to keep this, then maybe keep it at debug/info level.

Copy link
Member Author

@lalitb lalitb Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the error is returned as Result to the user, we decide not to log it even if this could be actionable ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not to log or log at a very low severity.

Do you see any reason why this should be logged, given Result already carries this information? "Internal logging" is more for things that are otherwise hard to figure out, right?

Copy link
Member Author

@lalitb lalitb Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer logging at a lower severity (debug?) for internal debugging. When we request logs from the customer at the debug level, they should include everything necessary for troubleshooting, even if the customer disregards the shutdown result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point! Agree with debug! here. It won't cause noise in normal operations, but when we ask users to report bug/issues, we can ask to collect all logs, including debug, so we'll have enough to investigate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to debug now.

);
Err(LogError::Other("logger provider already shut down".into()))
Err(error)
}
}
}
Expand All @@ -140,14 +151,38 @@
struct LoggerProviderInner {
processors: Vec<Box<dyn LogProcessor>>,
resource: Resource,
is_shutdown: AtomicBool,
}

impl Drop for LoggerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
if self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please make it own separate PR, so we can keep this PR strictly for replacing global error handler with internal logger.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done in response to the comment - #2184 (comment). Should we keep it in this PR with separate changelog entry?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always prefer short, very focused PRs!
One can easily review them and continuously make progress by merging short PRs quickly. Slow and steady!

.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
match err {

Check warning on line 166 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L166

Added line #L166 was not covered by tests
// Specific handling for mutex poisoning
LogError::MutexPoisoned(_) => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
);

Check warning on line 171 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L169-L171

Added lines #L169 - L171 were not covered by tests
}
_ => {
otel_debug!(

Check warning on line 174 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L174

Added line #L174 was not covered by tests
name: "LoggerProvider.Drop.ShutdownError",
error = format!("{err}")

Check warning on line 176 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L176

Added line #L176 was not covered by tests
);
}
}
}
}
} else {
otel_debug!(
name: "LoggerProvider.Drop.AlreadyShutdown",
);
}
}
}
Expand Down Expand Up @@ -202,8 +237,8 @@
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
resource,
is_shutdown: AtomicBool::new(false),
}),
is_shutdown: Arc::new(AtomicBool::new(false)),
};

// invoke set_resource on all the processors
Expand Down
90 changes: 34 additions & 56 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_error, otel_warn, InstrumentationLibrary,
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
};

use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -99,26 +98,36 @@
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "simple_log_processor_emit_after_shutdown"
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(
name: "simple_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(err);
// Handle errors with specific static names
match result {

Check warning on line 117 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L117

Added line #L117 was not covered by tests
Err(LogError::MutexPoisoned(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);

Check warning on line 122 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L120-L122

Added lines #L120 - L122 were not covered by tests
}
Err(err) => {
otel_error!(

Check warning on line 125 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L124-L125

Added lines #L124 - L125 were not covered by tests
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)

Check warning on line 127 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L127

Added line #L127 was not covered by tests
);
}
_ => {}
}
}

Expand All @@ -133,12 +142,7 @@
exporter.shutdown();
Ok(())
} else {
otel_error!(
name: "simple_log_processor_shutdown_error"
);
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))

Check warning on line 145 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L145

Added line #L145 was not covered by tests
}
}

Expand Down Expand Up @@ -170,12 +174,12 @@
instrumentation.clone(),
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "batch_log_processor_emit_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be triggered when channel is full? If yes, we need to rethink this, as this can spam the log output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this error only triggers when channel is full or closed. We need to add some throttling or logic to prevent flooding - have added the TODO for now, as we need common strategy for such flooding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we need a common strategy, but lets remove the error log from here. It'll flood as-is when buffer is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove altogether, or make it otel_debug for now - with comment to change it to otel_error once throttling is ready.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either of them are fine with me, though I slightly prefer removing altogether, as I don't know if we can ship a throttling solution for next release.

error = format!("{}", err)

Check warning on line 181 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L181

Added line #L181 was not covered by tests
);
global::handle_error(LogError::Other(err.into()));
}
}

Expand Down Expand Up @@ -243,10 +247,9 @@

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_export_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)

Check warning on line 251 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L251

Added line #L251 was not covered by tests
);
global::handle_error(err);
}
}
}
Expand All @@ -261,24 +264,12 @@
.await;

if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", result),
message = "Failed to send flush result"
if let Err(send_error) = channel.send(result) {
otel_debug!(

Check warning on line 268 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L268

Added line #L268 was not covered by tests
name: "BatchLogProcessor.Flush.SendResultError",
error = format!("{:?}", send_error),

Check warning on line 270 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L270

Added line #L270 was not covered by tests
);
}
} else if let Err(err) = result {
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", err),
message = "Flush failed"
);
global::handle_error(err);
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
Expand All @@ -293,21 +284,14 @@

exporter.shutdown();

if let Err(result) = ch.send(result) {
otel_error!(
name: "batch_log_processor_shutdown_error",
error = format!("{:?}", result),
message = "Failed to send shutdown result"
if let Err(send_error) = ch.send(result) {
otel_debug!(

Check warning on line 288 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L288

Added line #L288 was not covered by tests
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),

Check warning on line 290 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L290

Added line #L290 was not covered by tests
);
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}

break;
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
Expand Down Expand Up @@ -357,13 +341,7 @@
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => {
otel_error!(
name: "export_with_timeout_timeout",
timeout_duration = time_out.as_millis()
);
ExportResult::Err(LogError::ExportTimedOut(time_out))
}
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),

Check warning on line 344 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L344

Added line #L344 was not covered by tests
}
}

Expand Down
43 changes: 41 additions & 2 deletions opentelemetry/src/global/internal_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,30 @@ macro_rules! otel_warn {
{
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), "");
}
#[cfg(not(feature = "internal-logs"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this change, but for future - please make them into own separate PR to make it easier to review.

{
#[allow(unused_variables)]
{

}
}
};
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::warn!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
{
let _ = ($name, $($value),+);
}
}
};
}
Expand Down Expand Up @@ -104,11 +123,31 @@ macro_rules! otel_error {
{
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), "");
}
#[cfg(not(feature = "internal-logs"))]
{
#[allow(unused_variables)]
{

}
}
};
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::error!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
{
let _ = ($name, $($value),+);

}
}
};
}
8 changes: 8 additions & 0 deletions opentelemetry/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub enum LogError {
#[error("Exporter timed out after {} seconds", .0.as_secs())]
ExportTimedOut(Duration),

/// Processor is already shutdown
#[error("{0} already shutdown")]
AlreadyShutdown(String),

/// Mutex lock poisoning
#[error("mutex lock poisioning for {0}")]
MutexPoisoned(String),

/// Other errors propagated from log SDK that weren't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down
Loading