Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve internal opentelemetry logging #2128

Merged
merged 38 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7da284e
initial commit
lalitb Sep 18, 2024
573994d
instrument internal code
lalitb Sep 19, 2024
1bfd464
instrument..
lalitb Sep 19, 2024
90470c3
instrument..
lalitb Sep 19, 2024
5f1b990
update example
lalitb Sep 19, 2024
47abb6d
Merge branch 'main' into log-handler-using-tracing
lalitb Sep 19, 2024
46ce8af
update example
lalitb Sep 19, 2024
bbfc03c
Merge branch 'log-handler-using-tracing' of github.com:lalitb/opentel…
lalitb Sep 19, 2024
e5260d9
changes for filtering
lalitb Sep 19, 2024
a79b636
Merge branch 'main' into log-handler-using-tracing
TommyCpp Sep 21, 2024
197b12a
use structured logging
lalitb Sep 23, 2024
91e56ea
review comments
lalitb Sep 27, 2024
fa7148e
update self diagnostic
lalitb Sep 27, 2024
a78357c
allow internal events in pipeline
lalitb Sep 27, 2024
a3fba36
Merge branch 'main' into log-handler-using-tracing
lalitb Sep 27, 2024
54e0755
fix macros
lalitb Oct 1, 2024
4c8cb42
revert appender-tracing example
lalitb Oct 1, 2024
5c4bab9
Merge branch 'main' into log-handler-using-tracing
lalitb Oct 1, 2024
5e95e00
update instrument
lalitb Oct 2, 2024
da3716d
cont..
lalitb Oct 2, 2024
62b9f83
lint fix
lalitb Oct 2, 2024
591c45a
fix macro to dump operation name as name keyt
lalitb Oct 2, 2024
b6e2327
Merge branch 'main' into log-handler-using-tracing
lalitb Oct 2, 2024
2714508
fix unit test for appender-tracing
lalitb Oct 3, 2024
b0c129c
Merge branch 'log-handler-using-tracing' of github.com:lalitb/opentel…
lalitb Oct 3, 2024
83763b1
fix unit tests, and doc test
lalitb Oct 3, 2024
e325316
add macros under feature flag
lalitb Oct 3, 2024
19822b1
make feature default
lalitb Oct 3, 2024
6b98c16
fix build
lalitb Oct 3, 2024
d248750
fix lint
lalitb Oct 3, 2024
f9e3d2d
lint
lalitb Oct 3, 2024
d4c1eb7
use eprintln for warn and error if internal-logs flag is disabled
lalitb Oct 3, 2024
d5120a2
review comments
lalitb Oct 3, 2024
c103467
remove tracing from opentelemetry crate, and use name as metadata
lalitb Oct 3, 2024
b7470b1
fix
lalitb Oct 3, 2024
30f7a4c
remove tracing from otel dev dep
lalitb Oct 3, 2024
adc3fd2
remove debug
lalitb Oct 3, 2024
3c4d20c
Update opentelemetry-sdk/src/metrics/internal/mod.rs
lalitb Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ rust-version = "1.65"

[dependencies]
log = { workspace = true, optional = true }
opentelemetry = { version = "0.25", path = "../opentelemetry", features = ["logs"] }
opentelemetry = { version = "0.25", path = "../opentelemetry", features = ["logs","experimental-internal-debugging"] }
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
tracing-log = { version = "0.2", optional = true }
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "ansi", "env-filter"] }

[dev-dependencies]
log = { workspace = true }
opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing", "experimental-internal-debugging"] }
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
Expand Down
21 changes: 17 additions & 4 deletions opentelemetry-appender-tracing/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use opentelemetry::KeyValue;
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::{logs::LoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::filter::{filter_fn, EnvFilter};
use tracing_subscriber::fmt;
use tracing_subscriber::layer::Layer;
use tracing_subscriber::prelude::*;

fn main() {
Expand All @@ -15,9 +18,19 @@ fn main() {
)]))
.with_simple_exporter(exporter)
.build();
let layer = layer::OpenTelemetryTracingBridge::new(&provider);
tracing_subscriber::registry().with(layer).init();

error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
let env_filter = EnvFilter::from_default_env();
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider);
let internal_log_layer = fmt::Layer::default()
.with_writer(std::io::stdout) // Writes to stdout
.pretty()
.with_filter(filter_fn(|meta| meta.target().starts_with("opentelemetry"))) // Custom filter function
.with_filter(env_filter);
tracing_subscriber::registry()
.with(internal_log_layer)
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
.with(otel_layer)
.init();
for i in 0..3 {
error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message", iteration = i);
}
let _ = provider.shutdown();
}
3 changes: 3 additions & 0 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ where
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
if event.metadata().target().starts_with("opentelemetry") {
return;
}
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "experimental_metadata_attributes")]
let normalized_meta = event.normalized_metadata();

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ url = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }
tracing = {workspace = true, optional = true}

[package.metadata.docs.rs]
all-features = true
Expand All @@ -51,6 +52,7 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std",
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
experimental-internal-debugging = ["opentelemetry/experimental-internal-debugging", "tracing"]

[[bench]]
name = "context"
Expand Down
24 changes: 24 additions & 0 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
use opentelemetry::{otel_debug, otel_info, otel_warn};

#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
Expand Down Expand Up @@ -49,6 +50,9 @@
attributes: Option<Vec<opentelemetry::KeyValue>>,
) -> Logger {
let name = name.into();
otel_info!(target: "opentelemetry-sdk", name: "logger_versioned_creation", signal: "log",
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
"Creating a new versioned logger with name: {:?}, version: {:?}, schema_url: {:?}, attributes: {:?}",

Check warning on line 54 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L54

Added line #L54 was not covered by tests
name, version, schema_url, attributes);

let component_name = if name.is_empty() {
Cow::Borrowed(DEFAULT_COMPONENT_NAME)
Expand All @@ -72,6 +76,8 @@
}

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
otel_info!(target: "opentelemetry-sdk", name: "logger_library_logger", signal: "log",
"Creating a library logger for library: {:?}", library);

Check warning on line 80 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L80

Added line #L80 was not covered by tests
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
Expand All @@ -83,19 +89,23 @@
impl LoggerProvider {
/// Create a new `LoggerProvider` builder.
pub fn builder() -> Builder {
otel_info!(target: "opentelemetry-sdk", name: "logger_provider_builder", signal: "log", "Creating a new LoggerProvider builder.");
Builder::default()
}

pub(crate) fn log_processors(&self) -> &[Box<dyn LogProcessor>] {
otel_debug!(target: "opentelemetry-sdk", name: "logger_provider_log_processors", signal: "log", "Retrieving {} log processors.", self.inner.processors.len());
&self.inner.processors
}

pub(crate) fn resource(&self) -> &Resource {
otel_debug!(target: "opentelemetry-sdk", name: "logger_provider_resource", signal: "log", "Retrieving {} resource for LoggerProvider.", self.inner.resource.len());
&self.inner.resource
}

/// Force flush all remaining logs in log processors and return results.
pub fn force_flush(&self) -> Vec<LogResult<()>> {
otel_info!(target: "opentelemetry-sdk", name: "logger_provider_force_flush", signal: "log", "Forcing flush for all log processors.");
self.log_processors()
.iter()
.map(|processor| processor.force_flush())
Expand All @@ -104,6 +114,7 @@

/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> LogResult<()> {
otel_info!(target: "opentelemetry-sdk", name: "logger_provider_shutdown", signal: "log", "Shutting down LoggerProvider.");
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
Expand All @@ -114,6 +125,7 @@
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
otel_warn!(target: "opentelemetry-sdk", name: "logger_provider_shutdown_error", signal: "log", "Error while shutting down a log processor: {:?}", err);

Check warning on line 128 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L128

Added line #L128 was not covered by tests
errs.push(err);
}
}
Expand All @@ -124,6 +136,7 @@
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
otel_warn!(target: "opentelemetry-sdk", name: "logger_provider_already_shutdown", signal: "log", "LoggerProvider is already shut down.");
Err(LogError::Other("logger provider already shut down".into()))
}
}
Expand Down Expand Up @@ -200,6 +213,8 @@
};

// invoke set_resource on all the processors
otel_debug!(target: "opentelemetry-sdk", name: "logger_provider_build", signal: "log",
"Setting resource for logger provider and applying it to processors.");

Check warning on line 217 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L217

Added line #L217 was not covered by tests
for processor in logger_provider.log_processors() {
processor.set_resource(logger_provider.resource());
}
Expand All @@ -221,6 +236,7 @@
instrumentation_lib: Arc<InstrumentationLibrary>,
provider: LoggerProvider,
) -> Self {
otel_info!(target: "opentelemetry-sdk", name: "logger_new", signal: "log", "Creating a new Logger.");
Logger {
instrumentation_lib,
provider,
Expand All @@ -247,6 +263,12 @@

/// Emit a `LogRecord`.
fn emit(&self, mut record: Self::LogRecord) {
otel_debug!(
target: "opentelemetry-sdk",
name: "log_record_emit_start",
signal: "log",
"Starting the process of emitting a log record"
);
let provider = self.provider();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
Expand Down Expand Up @@ -280,6 +302,8 @@
self.instrumentation_library().name.as_ref(),
);
}
otel_debug!(target: "opentelemetry-sdk", name: "log_record_event_enabled_check", signal: "log", "log event enabled status is {} for target: {} ", enabled, target);

enabled
}
}
Expand Down
20 changes: 20 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
logs::{LogError, LogResult},
InstrumentationLibrary,
};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};
use std::sync::atomic::AtomicBool;
use std::{cmp::min, env, sync::Mutex};
use std::{
Expand Down Expand Up @@ -87,6 +88,7 @@

impl SimpleLogProcessor {
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
otel_info!(target: "opentelemetry-sdk", name: "simple_log_processor_new", signal: "log", "Initializing SimpleLogProcessor.");
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
Expand All @@ -98,9 +100,12 @@
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
otel_warn!(target: "opentelemetry-sdk", name: "simple_log_processor_emit_after_shutdown", signal: "log", "Attempted to emit log after processor shutdown.");
return;
}

otel_debug!(target: "opentelemetry-sdk", name: "simple_log_processor_emit", signal: "log", "Emitting log record.");

let result = self
.exporter
.lock()
Expand All @@ -110,28 +115,33 @@
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(target: "opentelemetry-sdk", name: "simple_log_processor_emit_error", signal: "log", "Failed to emit log record: {:?}", err);

Check warning on line 118 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L118

Added line #L118 was not covered by tests
global::handle_error(err);
}
}

fn force_flush(&self) -> LogResult<()> {
otel_info!(target: "opentelemetry-sdk", name: "simple_log_processor_force_flush", signal: "log", "Forcing flush on log processor.");
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
otel_info!(target: "opentelemetry-sdk", name: "simple_log_processor_shutdown", signal: "log", "Shutting down SimpleLogProcessor.");
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
otel_error!(target: "opentelemetry-sdk", name: "simple_log_processor_shutdown_error", signal: "log", "Failed to shutdown SimpleLogProcessor.");

Check warning on line 136 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L136

Added line #L136 was not covered by tests
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
}
}

fn set_resource(&self, resource: &Resource) {
otel_info!(target: "opentelemetry-sdk", name: "simple_log_processor_set_resource", signal: "log", "Setting resource for SimpleLogProcessor.");
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
Expand All @@ -154,17 +164,20 @@

impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
otel_info!(target: "opentelemetry-sdk", name: "batch_log_processor_emit", signal: "log", "Emitting log record in BatchLogProcessor.");
let result = self.message_sender.try_send(BatchMessage::ExportLog((
record.clone(),
instrumentation.clone(),
)));

if let Err(err) = result {
otel_error!(target: "opentelemetry-sdk", name: "batch_log_processor_emit_error", signal: "log", "Failed to send log message to BatchLogProcessor: {:?}", err);
global::handle_error(LogError::Other(err.into()));
}
}

fn force_flush(&self) -> LogResult<()> {
otel_info!(target: "opentelemetry-sdk", name: "batch_log_processor_force_flush", signal: "log", "Forcing flush in BatchLogProcessor.");
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))
Expand All @@ -176,6 +189,7 @@
}

fn shutdown(&self) -> LogResult<()> {
otel_info!(target: "opentelemetry-sdk", name: "batch_log_processor_shutdown", signal: "log", "Shutting down BatchLogProcessor.");
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand All @@ -187,6 +201,7 @@
}

fn set_resource(&self, resource: &Resource) {
otel_info!(target: "opentelemetry-sdk", name: "batch_log_processor_set_resource", signal: "log", "Setting resource for BatchLogProcessor.");
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
Expand Down Expand Up @@ -217,8 +232,11 @@
// Log has finished, add to buffer of pending logs.
BatchMessage::ExportLog(log) => {
logs.push(log);
otel_debug!(target: "opentelemetry-sdk", name: "batch_log_processor_record_count", signal: "log", "Batching log records. Current batch size: {}", logs.len());


if logs.len() == config.max_export_batch_size {
otel_info!(target: "opentelemetry-sdk", name: "batch_log_processor_export", signal: "log", "Exporting log batch of size: {}", logs.len());

Check warning on line 239 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L239

Added line #L239 was not covered by tests
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
Expand All @@ -228,6 +246,7 @@
.await;

if let Err(err) = result {
otel_error!(target: "opentelemetry-sdk", name: "batch_log_processor_export_error", signal: "log", "Failed to export log batch: {:?}", err);

Check warning on line 249 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L249

Added line #L249 was not covered by tests
global::handle_error(err);
}
}
Expand Down Expand Up @@ -319,6 +338,7 @@
.iter()
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
otel_debug!(target: "opentelemetry-sdk", name: "export_with_timeout", signal: "log", "Exporting log batch of size: {}", log_vec.len());
let export = exporter.export(LogBatch::new(log_vec.as_slice()));
let timeout = runtime.delay(time_out);
pin_mut!(export);
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ futures-sink = "0.3"
once_cell = { workspace = true }
pin-project-lite = { workspace = true, optional = true }
thiserror = { workspace = true }
tracing = { version = "0.1", optional = true }

[target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies]
js-sys = "0.3.63"
Expand All @@ -38,6 +39,7 @@ testing = ["trace", "metrics"]
logs = []
logs_level_enabled = ["logs"]
otel_unstable = []
experimental-internal-debugging = ["tracing"]

[dev-dependencies]
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs_level_enabled"]} # for documentation tests
Expand Down
Loading
Loading