diff --git a/Cargo.lock b/Cargo.lock index 0133c8356466..de1b1218cada 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,6 +206,16 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -1010,6 +1020,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "boxcar" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42" + [[package]] name = "bstr" version = "1.5.0" @@ -2433,6 +2449,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gettid" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397256552fed4a9e577850498071831ec8f18ea83368aecc114cab469dcb43e5" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "gimli" version = "0.31.1" @@ -4212,6 +4238,16 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "papaya" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc7c76487f7eaa00a0fc1d7f88dc6b295aec478d11b0fc79f857b62c2874124c" +dependencies = [ + "equivalent", + "seize", +] + [[package]] name = "parking" version = "2.1.1" @@ -4839,6 +4875,7 @@ dependencies = [ "ahash", "anyhow", "arc-swap", + "assert-json-diff", "async-compression", "async-trait", "atomic-take", @@ -4846,6 +4883,7 @@ dependencies = [ "aws-sdk-iam", "aws-sigv4", "base64 0.13.1", + "boxcar", "bstr", "bytes", "camino", @@ -4862,6 +4900,7 @@ dependencies = [ "flate2", "framed-websockets", "futures", + "gettid", "hashbrown 0.14.5", "hashlink", "hex", @@ -4884,7 +4923,9 @@ dependencies = [ "measured", "metrics", "once_cell", + "opentelemetry", "p256 0.13.2", + "papaya", "parking_lot 0.12.1", "parquet", "parquet_derive", @@ -4931,6 +4972,9 @@ dependencies = [ "tokio-tungstenite 0.21.0", "tokio-util", "tracing", + "tracing-log", + "tracing-opentelemetry", + "tracing-serde", "tracing-subscriber", "tracing-utils", "try-lock", @@ -5884,6 +5928,16 @@ dependencies = [ "libc", ] +[[package]] +name = "seize" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "semver" version = "1.0.17" @@ -8145,6 +8199,7 @@ dependencies = [ "tower 0.4.13", "tracing", "tracing-core", + "tracing-log", "url", "zerocopy", "zeroize", diff --git a/Cargo.toml b/Cargo.toml index 267a91d773c4..76b54ae1d877 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } atomic-take = "1.1.0" backtrace = "0.3.74" flate2 = "1.0.26" +assert-json-diff = "2" async-stream = "0.3" async-trait = "0.1" aws-config = { version = "1.5", default-features = false, features=["rustls", "sso"] } @@ -193,7 +194,9 @@ tower-http = { version = "0.6.2", features = ["request-id", "trace"] } tower-service = "0.3.3" tracing = "0.1" tracing-error = "0.2" +tracing-log = "0.2" tracing-opentelemetry = "0.28" +tracing-serde = "0.2.0" tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] } try-lock = "0.2.5" twox-hash = { version = "1.6.3", default-features = false } diff --git a/deny.toml b/deny.toml index df00a34c60d6..b55140556801 100644 --- a/deny.toml +++ b/deny.toml @@ -32,6 +32,7 @@ reason = "the marvin attack only affects private key decryption, not public key # https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html [licenses] allow = [ + "0BSD", "Apache-2.0", "BSD-2-Clause", "BSD-3-Clause", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 35574e945cd9..d7880ea7b964 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -19,6 +19,7 @@ aws-config.workspace = true aws-sdk-iam.workspace = true aws-sigv4.workspace = true base64.workspace = true +boxcar = "0.2.8" bstr.workspace = true bytes = { workspace = true, features = ["serde"] } camino.workspace = true @@ -42,6 +43,7 @@ hyper0.workspace = true hyper = { workspace = true, features = ["server", "http1", "http2"] } hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] } http-body-util = { version = "0.1" } +gettid = "0.1.3" indexmap = { workspace = true, features = ["serde"] } ipnet.workspace = true itertools.workspace = true @@ -50,6 +52,8 @@ lasso = { workspace = true, features = ["multi-threaded"] } measured = { workspace = true, features = ["lasso"] } metrics.workspace = true once_cell.workspace = true +opentelemetry = { workspace = true, features = ["trace"] } +papaya = "0.1.8" parking_lot.workspace = true parquet.workspace = true parquet_derive.workspace = true @@ -89,6 +93,9 @@ tokio = { workspace = true, features = ["signal"] } tracing-subscriber.workspace = true tracing-utils.workspace = true tracing.workspace = true +tracing-log.workspace = true +tracing-serde.workspace = true +tracing-opentelemetry.workspace = true try-lock.workspace = true typed-json.workspace = true url.workspace = true @@ -112,6 +119,7 @@ rsa = "0.9" workspace_hack.workspace = true [dev-dependencies] +assert-json-diff.workspace = true camino-tempfile.workspace = true fallible-iterator.workspace = true flate2.workspace = true diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 41f10f052ffa..97c9f5a59c27 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,10 +1,23 @@ -use tracing::Subscriber; +use std::cell::{Cell, RefCell}; +use std::collections::HashMap; +use std::hash::BuildHasher; +use std::{env, io}; + +use chrono::{DateTime, Utc}; +use opentelemetry::trace::TraceContextExt; +use scopeguard::defer; +use serde::ser::{SerializeMap, Serializer}; +use tracing::span; +use tracing::subscriber::Interest; +use tracing::{callsite, Event, Metadata, Span, Subscriber}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use tracing_subscriber::fmt::format::{Format, Full}; use tracing_subscriber::fmt::time::SystemTime; use tracing_subscriber::fmt::{FormatEvent, FormatFields}; +use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::prelude::*; -use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::registry::{LookupSpan, SpanRef}; /// Initialize logging and OpenTelemetry tracing and exporter. /// @@ -15,6 +28,8 @@ use tracing_subscriber::registry::LookupSpan; /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. /// See pub async fn init() -> anyhow::Result { + let logfmt = LogFormat::from_env()?; + let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy() @@ -29,17 +44,36 @@ pub async fn init() -> anyhow::Result { .expect("this should be a valid filter directive"), ); - let fmt_layer = tracing_subscriber::fmt::layer() - .with_ansi(false) - .with_writer(std::io::stderr) - .with_target(false); - let otlp_layer = tracing_utils::init_tracing("proxy").await; + let json_log_layer = if logfmt == LogFormat::Json { + Some(JsonLoggingLayer { + clock: RealClock, + skipped_field_indices: papaya::HashMap::default(), + writer: StderrWriter { + stderr: std::io::stderr(), + }, + }) + } else { + None + }; + + let text_log_layer = if logfmt == LogFormat::Text { + Some( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(std::io::stderr) + .with_target(false), + ) + } else { + None + }; + tracing_subscriber::registry() .with(env_filter) .with(otlp_layer) - .with(fmt_layer) + .with(json_log_layer) + .with(text_log_layer) .try_init()?; Ok(LoggingGuard) @@ -94,3 +128,857 @@ impl Drop for LoggingGuard { tracing_utils::shutdown_tracing(); } } + +// TODO: make JSON the default +#[derive(Copy, Clone, PartialEq, Eq, Default, Debug)] +enum LogFormat { + #[default] + Text = 1, + Json, +} + +impl LogFormat { + fn from_env() -> anyhow::Result { + let logfmt = env::var("LOGFMT"); + Ok(match logfmt.as_deref() { + Err(_) => LogFormat::default(), + Ok("text") => LogFormat::Text, + Ok("json") => LogFormat::Json, + Ok(logfmt) => anyhow::bail!("unknown log format: {logfmt}"), + }) + } +} + +trait MakeWriter { + fn make_writer(&self) -> impl io::Write; +} + +struct StderrWriter { + stderr: io::Stderr, +} + +impl MakeWriter for StderrWriter { + #[inline] + fn make_writer(&self) -> impl io::Write { + self.stderr.lock() + } +} + +// TODO: move into separate module or even separate crate. +trait Clock { + fn now(&self) -> DateTime; +} + +struct RealClock; + +impl Clock for RealClock { + #[inline] + fn now(&self) -> DateTime { + Utc::now() + } +} + +/// Name of the field used by tracing crate to store the event message. +const MESSAGE_FIELD: &str = "message"; + +thread_local! { + /// Protects against deadlocks and double panics during log writing. + /// The current panic handler will use tracing to log panic information. + static REENTRANCY_GUARD: Cell = const { Cell::new(false) }; + /// Thread-local instance with per-thread buffer for log writing. + static EVENT_FORMATTER: RefCell = RefCell::new(EventFormatter::new()); + /// Cached OS thread ID. + static THREAD_ID: u64 = gettid::gettid(); +} + +/// Implements tracing layer to handle events specific to logging. +struct JsonLoggingLayer { + clock: C, + skipped_field_indices: papaya::HashMap, + writer: W, +} + +impl Layer for JsonLoggingLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + use std::io::Write; + + // TODO: consider special tracing subscriber to grab timestamp very + // early, before OTel machinery, and add as event extension. + let now = self.clock.now(); + + let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| { + if entered.get() { + let mut formatter = EventFormatter::new(); + formatter.format(now, event, &ctx, &self.skipped_field_indices)?; + self.writer.make_writer().write_all(formatter.buffer()) + } else { + entered.set(true); + defer!(entered.set(false);); + + EVENT_FORMATTER.with_borrow_mut(move |formatter| { + formatter.reset(); + formatter.format(now, event, &ctx, &self.skipped_field_indices)?; + self.writer.make_writer().write_all(formatter.buffer()) + }) + } + }); + + // In case logging fails we generate a simpler JSON object. + if let Err(err) = res { + if let Ok(mut line) = serde_json::to_vec(&serde_json::json!( { + "timestamp": now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + "level": "ERROR", + "message": format_args!("cannot log event: {err:?}"), + "fields": { + "event": format_args!("{event:?}"), + }, + })) { + line.push(b'\n'); + self.writer.make_writer().write_all(&line).ok(); + } + } + } + + /// Registers a SpanFields instance as span extension. + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("span must exist"); + let fields = SpanFields::default(); + fields.record_fields(attrs); + // This could deadlock when there's a panic somewhere in the tracing + // event handling and a read or write guard is still held. This includes + // the OTel subscriber. + span.extensions_mut().insert(fields); + } + + fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("span must exist"); + let ext = span.extensions(); + if let Some(data) = ext.get::() { + data.record_fields(values); + } + } + + /// Called (lazily) whenever a new log call is executed. We quickly check + /// for duplicate field names and record duplicates as skippable. Last one + /// wins. + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + if !metadata.is_event() { + // Must not be never because we wouldn't get trace and span data. + return Interest::always(); + } + + let mut field_indices = SkippedFieldIndices::default(); + let mut seen_fields = HashMap::<&'static str, usize>::new(); + for field in metadata.fields() { + use std::collections::hash_map::Entry; + match seen_fields.entry(field.name()) { + Entry::Vacant(entry) => { + // field not seen yet + entry.insert(field.index()); + } + Entry::Occupied(mut entry) => { + // replace currently stored index + let old_index = entry.insert(field.index()); + // ... and append it to list of skippable indices + field_indices.push(old_index); + } + } + } + + if !field_indices.is_empty() { + self.skipped_field_indices + .pin() + .insert(metadata.callsite(), field_indices); + } + + Interest::always() + } +} + +/// Stores span field values recorded during the spans lifetime. +#[derive(Default)] +struct SpanFields { + // TODO: Switch to custom enum with lasso::Spur for Strings? + fields: papaya::HashMap<&'static str, serde_json::Value>, +} + +impl SpanFields { + #[inline] + fn record_fields(&self, fields: R) { + fields.record(&mut SpanFieldsRecorder { + fields: self.fields.pin(), + }); + } +} + +/// Implements a tracing field visitor to convert and store values. +struct SpanFieldsRecorder<'m, S, G> { + fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>, +} + +impl tracing::field::Visit for SpanFieldsRecorder<'_, S, G> { + #[inline] + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + if let Ok(value) = i64::try_from(value) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } else { + self.fields + .insert(field.name(), serde_json::Value::from(format!("{value}"))); + } + } + + #[inline] + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + if let Ok(value) = u64::try_from(value) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } else { + self.fields + .insert(field.name(), serde_json::Value::from(format!("{value}"))); + } + } + + #[inline] + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.fields + .insert(field.name(), serde_json::Value::from(value)); + } + + #[inline] + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.fields + .insert(field.name(), serde_json::Value::from(format!("{value:?}"))); + } + + #[inline] + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + self.fields + .insert(field.name(), serde_json::Value::from(format!("{value}"))); + } +} + +/// List of field indices skipped during logging. Can list duplicate fields or +/// metafields not meant to be logged. +#[derive(Clone, Default)] +struct SkippedFieldIndices { + bits: u64, +} + +impl SkippedFieldIndices { + #[inline] + fn is_empty(&self) -> bool { + self.bits == 0 + } + + #[inline] + fn push(&mut self, index: usize) { + self.bits |= 1u64 + .checked_shl(index as u32) + .expect("field index too large"); + } + + #[inline] + fn contains(&self, index: usize) -> bool { + self.bits + & 1u64 + .checked_shl(index as u32) + .expect("field index too large") + != 0 + } +} + +/// Formats a tracing event and writes JSON to its internal buffer including a newline. +// TODO: buffer capacity management, truncate if too large +struct EventFormatter { + logline_buffer: Vec, +} + +impl EventFormatter { + #[inline] + fn new() -> Self { + EventFormatter { + logline_buffer: Vec::new(), + } + } + + #[inline] + fn buffer(&self) -> &[u8] { + &self.logline_buffer + } + + #[inline] + fn reset(&mut self) { + self.logline_buffer.clear(); + } + + fn format( + &mut self, + now: DateTime, + event: &Event<'_>, + ctx: &Context<'_, S>, + skipped_field_indices: &papaya::HashMap, + ) -> io::Result<()> + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + let timestamp = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + + use tracing_log::NormalizeEvent; + let normalized_meta = event.normalized_metadata(); + let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); + + let skipped_field_indices = skipped_field_indices.pin(); + let skipped_field_indices = skipped_field_indices.get(&meta.callsite()); + + let mut serialize = || { + let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer); + + let mut serializer = serializer.serialize_map(None)?; + + // Timestamp comes first, so raw lines can be sorted by timestamp. + serializer.serialize_entry("timestamp", ×tamp)?; + + // Level next. + serializer.serialize_entry("level", &meta.level().as_str())?; + + // Message next. + serializer.serialize_key("message")?; + let mut message_extractor = + MessageFieldExtractor::new(serializer, skipped_field_indices); + event.record(&mut message_extractor); + let mut serializer = message_extractor.into_serializer()?; + + let mut fields_present = FieldsPresent(false, skipped_field_indices); + event.record(&mut fields_present); + if fields_present.0 { + serializer.serialize_entry( + "fields", + &SerializableEventFields(event, skipped_field_indices), + )?; + } + + let pid = std::process::id(); + if pid != 1 { + serializer.serialize_entry("process_id", &pid)?; + } + + THREAD_ID.with(|tid| serializer.serialize_entry("thread_id", tid))?; + + // TODO: tls cache? name could change + if let Some(thread_name) = std::thread::current().name() { + if !thread_name.is_empty() && thread_name != "tokio-runtime-worker" { + serializer.serialize_entry("thread_name", thread_name)?; + } + } + + if let Some(task_id) = tokio::task::try_id() { + serializer.serialize_entry("task_id", &format_args!("{task_id}"))?; + } + + serializer.serialize_entry("target", meta.target())?; + + if let Some(module) = meta.module_path() { + if module != meta.target() { + serializer.serialize_entry("module", module)?; + } + } + + if let Some(file) = meta.file() { + if let Some(line) = meta.line() { + serializer.serialize_entry("src", &format_args!("{file}:{line}"))?; + } else { + serializer.serialize_entry("src", file)?; + } + } + + { + let otel_context = Span::current().context(); + let otel_spanref = otel_context.span(); + let span_context = otel_spanref.span_context(); + if span_context.is_valid() { + serializer.serialize_entry( + "trace_id", + &format_args!("{}", span_context.trace_id()), + )?; + } + } + + serializer.serialize_entry("spans", &SerializableSpanStack(ctx))?; + + serializer.end() + }; + + serialize().map_err(io::Error::other)?; + self.logline_buffer.push(b'\n'); + Ok(()) + } +} + +/// Extracts the message field that's mixed will other fields. +struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> { + serializer: S, + skipped_field_indices: Option<&'a SkippedFieldIndices>, + state: Option>, +} + +impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> { + #[inline] + fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + Self { + serializer, + skipped_field_indices, + state: None, + } + } + + #[inline] + fn into_serializer(mut self) -> Result { + match self.state { + Some(Ok(())) => {} + Some(Err(err)) => return Err(err), + None => self.serializer.serialize_value("")?, + } + Ok(self.serializer) + } + + #[inline] + fn accept_field(&self, field: &tracing::field::Field) -> bool { + self.state.is_none() + && field.name() == MESSAGE_FIELD + && !self + .skipped_field_indices + .is_some_and(|i| i.contains(field.index())) + } +} + +impl tracing::field::Visit for MessageFieldExtractor<'_, S> { + #[inline] + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&format_args!("{value:x?}"))); + } + } + + #[inline] + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&value)); + } + } + + #[inline] + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&format_args!("{value:?}"))); + } + } + + #[inline] + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + if self.accept_field(field) { + self.state = Some(self.serializer.serialize_value(&format_args!("{value}"))); + } + } +} + +/// Checks if there's any fields and field values present. If not, the JSON subobject +/// can be skipped. +// This is entirely optional and only cosmetic, though maybe helps a +// bit during log parsing in dashboards when there's no field with empty object. +struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>); + +// Even though some methods have an overhead (error, bytes) it is assumed the +// compiler won't include this since we ignore the value entirely. +impl tracing::field::Visit for FieldsPresent<'_> { + #[inline] + fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) { + if !self.1.is_some_and(|i| i.contains(field.index())) + && field.name() != MESSAGE_FIELD + && !field.name().starts_with("log.") + { + self.0 |= true; + } + } +} + +/// Serializes the fields directly supplied with a log event. +struct SerializableEventFields<'a, 'event>( + &'a tracing::Event<'event>, + Option<&'a SkippedFieldIndices>, +); + +impl serde::ser::Serialize for SerializableEventFields<'_, '_> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeMap; + let serializer = serializer.serialize_map(None)?; + let mut message_skipper = MessageFieldSkipper::new(serializer, self.1); + self.0.record(&mut message_skipper); + let serializer = message_skipper.into_serializer()?; + serializer.end() + } +} + +/// A tracing field visitor that skips the message field. +struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> { + serializer: S, + skipped_field_indices: Option<&'a SkippedFieldIndices>, + state: Result<(), S::Error>, +} + +impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> { + #[inline] + fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self { + Self { + serializer, + skipped_field_indices, + state: Ok(()), + } + } + + #[inline] + fn accept_field(&self, field: &tracing::field::Field) -> bool { + self.state.is_ok() + && field.name() != MESSAGE_FIELD + && !field.name().starts_with("log.") + && !self + .skipped_field_indices + .is_some_and(|i| i.contains(field.index())) + } + + #[inline] + fn into_serializer(self) -> Result { + self.state?; + Ok(self.serializer) + } +} + +impl tracing::field::Visit for MessageFieldSkipper<'_, S> { + #[inline] + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_i128(&mut self, field: &tracing::field::Field, value: i128) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_u128(&mut self, field: &tracing::field::Field, value: u128) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) { + if self.accept_field(field) { + self.state = self + .serializer + .serialize_entry(field.name(), &format_args!("{value:x?}")); + } + } + + #[inline] + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if self.accept_field(field) { + self.state = self.serializer.serialize_entry(field.name(), &value); + } + } + + #[inline] + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if self.accept_field(field) { + self.state = self + .serializer + .serialize_entry(field.name(), &format_args!("{value:?}")); + } + } + + #[inline] + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + if self.accept_field(field) { + self.state = self.serializer.serialize_value(&format_args!("{value}")); + } + } +} + +/// Serializes the span stack from root to leaf (parent of event) enumerated +/// inside an object where the keys are just the number padded with zeroes +/// to retain sorting order. +// The object is necessary because Loki cannot flatten arrays. +struct SerializableSpanStack<'a, 'b, Span>(&'b Context<'a, Span>) +where + Span: Subscriber + for<'lookup> LookupSpan<'lookup>; + +impl serde::ser::Serialize for SerializableSpanStack<'_, '_, Span> +where + Span: Subscriber + for<'lookup> LookupSpan<'lookup>, +{ + fn serialize(&self, serializer: Ser) -> Result + where + Ser: serde::ser::Serializer, + { + let mut serializer = serializer.serialize_map(None)?; + + if let Some(leaf_span) = self.0.lookup_current() { + for (i, span) in leaf_span.scope().from_root().enumerate() { + serializer.serialize_entry(&format_args!("{i:02}"), &SerializableSpan(&span))?; + } + } + + serializer.end() + } +} + +/// Serializes a single span. Include the span ID, name and its fields as +/// recorded up to this point. +struct SerializableSpan<'a, 'b, Span>(&'b SpanRef<'a, Span>) +where + Span: for<'lookup> LookupSpan<'lookup>; + +impl serde::ser::Serialize for SerializableSpan<'_, '_, Span> +where + Span: for<'lookup> LookupSpan<'lookup>, +{ + fn serialize(&self, serializer: Ser) -> Result + where + Ser: serde::ser::Serializer, + { + let mut serializer = serializer.serialize_map(None)?; + // TODO: the span ID is probably only useful for debugging tracing. + serializer.serialize_entry("span_id", &format_args!("{:016x}", self.0.id().into_u64()))?; + serializer.serialize_entry("span_name", self.0.metadata().name())?; + + let ext = self.0.extensions(); + if let Some(data) = ext.get::() { + for (key, value) in &data.fields.pin() { + serializer.serialize_entry(key, value)?; + } + } + + serializer.end() + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use std::sync::{Arc, Mutex, MutexGuard}; + + use assert_json_diff::assert_json_eq; + use tracing::info_span; + + use super::*; + + struct TestClock { + current_time: Mutex>, + } + + impl Clock for Arc { + fn now(&self) -> DateTime { + *self.current_time.lock().expect("poisoned") + } + } + + struct VecWriter<'a> { + buffer: MutexGuard<'a, Vec>, + } + + impl MakeWriter for Arc>> { + fn make_writer(&self) -> impl io::Write { + VecWriter { + buffer: self.lock().expect("poisoned"), + } + } + } + + impl io::Write for VecWriter<'_> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buffer.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + #[test] + fn test_field_collection() { + let clock = Arc::new(TestClock { + current_time: Mutex::new(Utc::now()), + }); + let buffer = Arc::new(Mutex::new(Vec::new())); + let log_layer = JsonLoggingLayer { + clock: clock.clone(), + skipped_field_indices: papaya::HashMap::default(), + writer: buffer.clone(), + }; + + let registry = tracing_subscriber::Registry::default().with(log_layer); + + tracing::subscriber::with_default(registry, || { + info_span!("span1", x = 40, x = 41, x = 42).in_scope(|| { + info_span!("span2").in_scope(|| { + tracing::error!( + a = 1, + a = 2, + a = 3, + message = "explicit message field", + "implicit message field" + ); + }); + }); + }); + + let buffer = Arc::try_unwrap(buffer) + .expect("no other reference") + .into_inner() + .expect("poisoned"); + let actual: serde_json::Value = serde_json::from_slice(&buffer).expect("valid JSON"); + let expected: serde_json::Value = serde_json::json!( + { + "timestamp": clock.now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + "level": "ERROR", + "message": "explicit message field", + "fields": { + "a": 3, + }, + "spans": { + "00":{ + "span_id": "0000000000000001", + "span_name": "span1", + "x": 42, + }, + "01": { + "span_id": "0000000000000002", + "span_name": "span2", + } + }, + "src": actual.as_object().unwrap().get("src").unwrap().as_str().unwrap(), + "target": "proxy::logging::tests", + "process_id": actual.as_object().unwrap().get("process_id").unwrap().as_number().unwrap(), + "thread_id": actual.as_object().unwrap().get("thread_id").unwrap().as_number().unwrap(), + "thread_name": "logging::tests::test_field_collection", + } + ); + + assert_json_eq!(actual, expected); + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index a3dffa8f195a..2c65401154a8 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -92,6 +92,7 @@ tonic = { version = "0.12", default-features = false, features = ["codegen", "pr tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } +tracing-log = { version = "0.2" } url = { version = "2", features = ["serde"] } zerocopy = { version = "0.7", features = ["derive", "simd"] } zeroize = { version = "1", features = ["derive", "serde"] }