Skip to content

Commit

Permalink
Improve internal opentelemetry logging (#2128)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 4, 2024
1 parent 86dd486 commit 2ff7ec0
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 78 deletions.
4 changes: 2 additions & 2 deletions examples/self-diagnostics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio", "experimental-internal-logs"]}
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio"]}
opentelemetry-stdout = { path = "../../opentelemetry-stdout"}
opentelemetry-appender-tracing = { path = "../../opentelemetry-appender-tracing"}
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter","registry", "std"]}
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client", "logs", "experimental-internal-logs"] }
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client", "logs"] }
once_cell ={ version = "1.19.0"}
ctrlc = "3.4"
1 change: 1 addition & 0 deletions opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std"] }
log = { workspace = true }
opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] }
tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] }
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
Expand Down
30 changes: 19 additions & 11 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,26 @@ mod tests {
use opentelemetry_sdk::trace;
use opentelemetry_sdk::trace::{Sampler, TracerProvider};
use tracing::error;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::Layer;

pub fn attributes_contains(log_record: &LogRecord, key: &Key, value: &AnyValue) -> bool {
log_record
.attributes_iter()
.any(|(k, v)| k == key && v == value)
}

fn create_tracing_subscriber(
_exporter: InMemoryLogsExporter,
logger_provider: &LoggerProvider,
) -> impl tracing::Subscriber {
let level_filter = tracing_subscriber::filter::LevelFilter::WARN; // Capture WARN and ERROR levels
let layer =
layer::OpenTelemetryTracingBridge::new(logger_provider).with_filter(level_filter); // No filter based on target, only based on log level

tracing_subscriber::registry().with(layer)
}

// cargo test --features=testing
#[test]
fn tracing_appender_standalone() {
Expand All @@ -234,8 +246,7 @@ mod tests {
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
let subscriber = create_tracing_subscriber(exporter.clone(), &logger_provider);

// avoiding setting tracing subscriber as global as that does not
// play well with unit tests.
Expand Down Expand Up @@ -315,8 +326,7 @@ mod tests {
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
let subscriber = create_tracing_subscriber(exporter.clone(), &logger_provider);

// avoiding setting tracing subscriber as global as that does not
// play well with unit tests.
Expand Down Expand Up @@ -427,16 +437,15 @@ mod tests {
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
let subscriber = create_tracing_subscriber(exporter.clone(), &logger_provider);

// avoiding setting tracing subscriber as global as that does not
// play well with unit tests.
let _guard = tracing::subscriber::set_default(subscriber);
drop(tracing_log::LogTracer::init());

// Act
log::error!("log from log crate");
log::error!(target: "my-system", "log from log crate");
logger_provider.force_flush();

// Assert TODO: move to helper methods
Expand Down Expand Up @@ -493,8 +502,7 @@ mod tests {
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
let subscriber = create_tracing_subscriber(exporter.clone(), &logger_provider);

// avoiding setting tracing subscriber as global as that does not
// play well with unit tests.
Expand All @@ -513,7 +521,7 @@ mod tests {
let span_id = cx.span().span_context().span_id();

// logging is done inside span context.
log::error!("log from log crate");
log::error!(target: "my-system", "log from log crate");
(trace_id, span_id)
});

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace", "opentelemetry-proto/
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics", "opentelemetry-proto/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs", "opentelemetry-proto/logs"]
populate-logs-event-name = ["opentelemetry-proto/populate-logs-event-name"]
experimental-internal-logs = ["tracing"]
internal-logs = ["tracing"]

# add ons
serialize = ["serde", "serde_json"]

default = ["grpc-tonic", "trace", "metrics", "logs"]
default = ["grpc-tonic", "trace", "metrics", "logs", "internal-logs"]

# grpc using tonic
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"]
Expand Down
24 changes: 1 addition & 23 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,29 +294,7 @@ impl TemporalitySelector for MetricsExporter {
#[async_trait]
impl PushMetricsExporter for MetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name = "export_metrics",
target = "opentelemetry-otlp",
metrics_count = metrics
.scope_metrics
.iter()
.map(|scope| scope.metrics.len())
.sum::<usize>(),
status = "started"
);
let result = self.client.export(metrics).await;
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name = "export_metrics",
target = "opentelemetry-otlp",
status = if result.is_ok() {
"completed"
} else {
"failed"
}
);
result
self.client.export(metrics).await
}

async fn force_flush(&self) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ temp-env = { workspace = true }
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace", "metrics", "logs"]
default = ["trace", "metrics", "logs", "internal-logs"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
Expand All @@ -52,7 +52,7 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std",
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
experimental-internal-logs = ["tracing"]
internal-logs = ["tracing"]

[[bench]]
name = "context"
Expand Down
9 changes: 8 additions & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
Expand Down Expand Up @@ -49,7 +50,6 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
attributes: Option<Vec<opentelemetry::KeyValue>>,
) -> Logger {
let name = name.into();

let component_name = if name.is_empty() {
Cow::Borrowed(DEFAULT_COMPONENT_NAME)
} else {
Expand Down Expand Up @@ -114,6 +114,10 @@ impl LoggerProvider {
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", err)
);
errs.push(err);
}
}
Expand All @@ -124,6 +128,9 @@ impl LoggerProvider {
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
otel_warn!(
name: "logger_provider_already_shutdown"
);
Err(LogError::Other("logger provider already shut down".into()))
}
}
Expand Down
59 changes: 42 additions & 17 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
InstrumentationLibrary,
otel_error, otel_warn, InstrumentationLibrary,
};

use std::sync::atomic::AtomicBool;
use std::{cmp::min, env, sync::Mutex};
use std::{
Expand Down Expand Up @@ -98,16 +99,12 @@ impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
otel_warn!(
name: "simple_log_processor_emit_after_shutdown"
);
return;
}

#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name: "simple_log_processor_emit",
target: "opentelemetry-sdk",
event_name = record.event_name
);

let result = self
.exporter
.lock()
Expand All @@ -117,6 +114,10 @@ impl LogProcessor for SimpleLogProcessor {
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(
name: "simple_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(err);
}
}
Expand All @@ -132,6 +133,9 @@ impl LogProcessor for SimpleLogProcessor {
exporter.shutdown();
Ok(())
} else {
otel_error!(
name: "simple_log_processor_shutdown_error"
);
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
Expand Down Expand Up @@ -167,6 +171,10 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
)));

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(LogError::Other(err.into()));
}
}
Expand Down Expand Up @@ -224,13 +232,6 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
// Log has finished, add to buffer of pending logs.
BatchMessage::ExportLog(log) => {
logs.push(log);
#[cfg(feature = "experimental-internal-logs")]
tracing::debug!(
name: "batch_log_processor_record_count",
target: "opentelemetry-sdk",
current_batch_size = logs.len()
);

if logs.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
Expand All @@ -241,6 +242,10 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
.await;

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_export_error",
error = format!("{:?}", err)
);
global::handle_error(err);
}
}
Expand All @@ -261,8 +266,18 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
"failed to send flush result: {:?}",
result
)));
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", result),
message = "Failed to send flush result"
);
}
} else if let Err(err) = result {
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", err),
message = "Flush failed"
);
global::handle_error(err);
}
}
Expand All @@ -279,6 +294,11 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
exporter.shutdown();

if let Err(result) = ch.send(result) {
otel_error!(
name: "batch_log_processor_shutdown_error",
error = format!("{:?}", result),
message = "Failed to send shutdown result"
);
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
Expand All @@ -295,7 +315,6 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
}));

// Return batch processor with link to worker
BatchLogProcessor { message_sender }
}
Expand Down Expand Up @@ -338,7 +357,13 @@ where
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
Either::Right((_, _)) => {
otel_error!(
name: "export_with_timeout_timeout",
timeout_duration = time_out.as_millis()
);
ExportResult::Err(LogError::ExportTimedOut(time_out))
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, KeyValue};
use opentelemetry::{global, otel_warn, KeyValue};

use crate::metrics::AttributeSet;

Expand Down Expand Up @@ -147,6 +147,9 @@ impl<AU: AtomicallyUpdate<T>, T: Number, O: Operation> ValueMap<AU, T, O> {
O::update_tracker(&new_tracker, measurement, index);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
}
}
}
Expand Down
Loading

0 comments on commit 2ff7ec0

Please sign in to comment.