Skip to content

Commit

Permalink
feat(telemetry): Add a compatibility layer that emits app logs as tra…
Browse files Browse the repository at this point in the history
…cing events.

Signed-off-by: Caleb Schoepp <caleb.schoepp@fermyon.com>
  • Loading branch information
calebschoepp committed May 14, 2024
1 parent c44da5f commit fc1f835
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions crates/telemetry/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry_otlp::{
const OTEL_SDK_DISABLED: &str = "OTEL_SDK_DISABLED";
const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL";
const OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL";
const APP_LOG_TO_TRACING_EVENT_DISABLED: &str = "APP_LOG_TO_TRACING_EVENT_DISABLED";

/// Returns a boolean indicating if the OTEL tracing layer should be enabled.
///
Expand Down Expand Up @@ -37,6 +38,15 @@ pub(crate) fn otel_metrics_enabled() -> bool {
]) && !otel_sdk_disabled()
}

/// Returns a boolean indicating if the compatibility layer that emits tracing events from
/// applications logs should be disabled.
///
/// It is considered disabled if the environment variable `APP_LOG_TO_TRACING_EVENT_DISABLED` is set and not
/// empty. By default it is enabled.
pub(crate) fn app_log_to_tracing_event_disabled() -> bool {
any_vars_set(&[APP_LOG_TO_TRACING_EVENT_DISABLED])
}

fn any_vars_set(enabling_vars: &[&str]) -> bool {
enabling_vars
.iter()
Expand Down
8 changes: 7 additions & 1 deletion crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter, Layer};

pub mod detector;
mod env;
pub mod log;
pub mod metrics;
mod propagation;
mod traces;
Expand Down Expand Up @@ -51,9 +52,14 @@ pub fn init(spin_version: String) -> anyhow::Result<ShutdownGuard> {
.with_writer(std::io::stderr)
.with_ansi(std::io::stderr().is_terminal())
.with_filter(
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
EnvFilter::from_default_env()
// Wasmtime is too noisy
.add_directive("wasmtime_wasi_http=warn".parse()?)
.add_directive("watchexec=off".parse()?),
// Watchexec is too noisy
.add_directive("watchexec=off".parse()?)
// We don't want to duplicate application logs
.add_directive("[{app_log}]=off".parse()?),
);

// Even if metrics or tracing aren't enabled we're okay to turn on the global error handler
Expand Down
18 changes: 18 additions & 0 deletions crates/telemetry/src/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::sync::OnceLock;

use crate::env;

/// Takes a Spin application log and emits it as a tracing event. This acts as a compatibility layer
/// to easily get Spin app logs as events∆1 in our OTel traces.
pub fn app_log_to_tracing_event(buf: &[u8]) {
static CELL: OnceLock<bool> = OnceLock::new();
if *CELL.get_or_init(env::app_log_to_tracing_event_disabled) {
return;
}

if let Ok(s) = std::str::from_utf8(buf) {
tracing::info!(app_log = s);
} else {
tracing::info!(app_log = "Application log: <non-utf8 data>");
}
}
1 change: 1 addition & 0 deletions crates/trigger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ spin-world = { path = "../world" }
spin-llm = { path = "../llm" }
spin-llm-local = { path = "../llm-local", optional = true }
spin-llm-remote-http = { path = "../llm-remote-http" }
spin-telemetry = { path = "../telemetry" }
sanitize-filename = "0.4"
serde = "1.0.188"
serde_json = "1.0"
Expand Down
215 changes: 150 additions & 65 deletions crates/trigger/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,23 @@ impl StdioLoggingTriggerHooks {
&self,
component_id: &str,
log_suffix: &str,
log_dir: &Path,
log_dir: Option<&Path>,
) -> Result<ComponentStdioWriter> {
let sanitized_component_id = sanitize_filename::sanitize(component_id);
let log_path = log_dir.join(format!("{sanitized_component_id}_{log_suffix}.txt"));
let mut log_path: Option<PathBuf> = None;
if let Some(log_dir) = log_dir {
log_path = Some(log_dir.join(format!(
"{sanitized_component_id}_{log_suffix}.txt",
sanitized_component_id = sanitized_component_id,
log_suffix = log_suffix
)))
};
let log_path = log_path.as_deref();

let follow = self.follow_components.should_follow(component_id);
ComponentStdioWriter::new(&log_path, follow)
.with_context(|| format!("Failed to open log file {}", quoted_path(&log_path)))
// It is safe to unwrap the log_path b/c new will only fail if the file can't be opened in which case log_path is Some
ComponentStdioWriter::new(log_path, follow)
.with_context(|| format!("Failed to open log file {}", quoted_path(log_path.unwrap())))
}

fn validate_follows(&self, app: &spin_app::App) -> anyhow::Result<()> {
Expand Down Expand Up @@ -112,27 +122,36 @@ impl TriggerHooks for StdioLoggingTriggerHooks {
component: &spin_app::AppComponent,
builder: &mut spin_core::StoreBuilder,
) -> anyhow::Result<()> {
match &self.log_dir {
Some(l) => {
builder.stdout_pipe(self.component_stdio_writer(component.id(), "stdout", l)?);
builder.stderr_pipe(self.component_stdio_writer(component.id(), "stderr", l)?);
}
None => {
builder.inherit_stdout();
builder.inherit_stderr();
}
}
builder.stdout_pipe(self.component_stdio_writer(
component.id(),
"stdout",
self.log_dir.as_deref(),
)?);
builder.stderr_pipe(self.component_stdio_writer(
component.id(),
"stderr",
self.log_dir.as_deref(),
)?);

Ok(())
}
}

/// ComponentStdioWriter forwards output to a log file and (optionally) stderr
pub struct ComponentStdioWriter {
sync_file: std::fs::File,
async_file: tokio::fs::File,
state: ComponentStdioWriterState,
follow: bool,
inner: ComponentStdioWriterInner,
}

enum ComponentStdioWriterInner {
/// TODO
Inherit,
/// TODO
Forward {
sync_file: std::fs::File,
async_file: tokio::fs::File,
state: ComponentStdioWriterState,
follow: bool,
},
}

#[derive(Debug)]
Expand All @@ -142,20 +161,37 @@ enum ComponentStdioWriterState {
}

impl ComponentStdioWriter {
pub fn new(log_path: &Path, follow: bool) -> anyhow::Result<Self> {
pub fn new(log_path: Option<&Path>, follow: bool) -> anyhow::Result<Self> {
match log_path {
Some(log_path) => Self::new_forward(log_path, follow),
None => Self::new_inherit(),
}
}

fn new_forward(log_path: &Path, follow: bool) -> anyhow::Result<Self> {
let sync_file = std::fs::File::options()
.create(true)
.append(true)
.open(log_path)?;

let async_file = sync_file
.try_clone()
.context("could not get async file handle")?
.into();

Ok(Self {
inner: ComponentStdioWriterInner::Forward {
sync_file,
async_file,
state: ComponentStdioWriterState::File,
follow,
},
})
}

fn new_inherit() -> anyhow::Result<Self> {
Ok(Self {
async_file,
sync_file,
state: ComponentStdioWriterState::File,
follow,
inner: ComponentStdioWriterInner::Inherit,
})
}
}
Expand All @@ -167,38 +203,56 @@ impl AsyncWrite for ComponentStdioWriter {
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
let this = self.get_mut();

loop {
match &this.state {
ComponentStdioWriterState::File => {
match &mut this.inner {
ComponentStdioWriterInner::Inherit => {
let written = futures::ready!(
std::pin::Pin::new(&mut this.async_file).poll_write(cx, buf)
std::pin::Pin::new(&mut tokio::io::stderr()).poll_write(cx, buf)
);
let written = match written {
Ok(e) => e,
Err(e) => return Poll::Ready(Err(e)),
};
if this.follow {
this.state = ComponentStdioWriterState::Follow(0..written);
} else {
return Poll::Ready(Ok(written));
}
}
ComponentStdioWriterState::Follow(range) => {
let written = futures::ready!(std::pin::Pin::new(&mut tokio::io::stderr())
.poll_write(cx, &buf[range.clone()]));
let written = match written {
Ok(e) => e,
Err(e) => return Poll::Ready(Err(e)),
};
if range.start + written >= range.end {
let end = range.end;
this.state = ComponentStdioWriterState::File;
return Poll::Ready(Ok(end));
} else {
this.state =
ComponentStdioWriterState::Follow((range.start + written)..range.end);
};
return Poll::Ready(Ok(written));
}
ComponentStdioWriterInner::Forward {
async_file,
state,
follow,
..
} => match &state {
ComponentStdioWriterState::File => {
let written =
futures::ready!(std::pin::Pin::new(async_file).poll_write(cx, buf));
let written = match written {
Ok(e) => e,
Err(e) => return Poll::Ready(Err(e)),
};
if *follow {
*state = ComponentStdioWriterState::Follow(0..written);
} else {
return Poll::Ready(Ok(written));
}
}
ComponentStdioWriterState::Follow(range) => {
let written = futures::ready!(std::pin::Pin::new(&mut tokio::io::stderr())
.poll_write(cx, &buf[range.clone()]));
let written = match written {
Ok(e) => e,
Err(e) => return Poll::Ready(Err(e)),
};
if range.start + written >= range.end {
let end = range.end;
*state = ComponentStdioWriterState::File;
return Poll::Ready(Ok(end));
} else {
*state = ComponentStdioWriterState::Follow(
(range.start + written)..range.end,
);
};
}
},
}
}
}
Expand All @@ -208,13 +262,19 @@ impl AsyncWrite for ComponentStdioWriter {
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let this = self.get_mut();
match this.state {
ComponentStdioWriterState::File => {
std::pin::Pin::new(&mut this.async_file).poll_flush(cx)
}
ComponentStdioWriterState::Follow(_) => {

match &mut this.inner {
ComponentStdioWriterInner::Inherit => {
std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx)
}
ComponentStdioWriterInner::Forward {
async_file, state, ..
} => match state {
ComponentStdioWriterState::File => std::pin::Pin::new(async_file).poll_flush(cx),
ComponentStdioWriterState::Follow(_) => {
std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx)
}
},
}
}

Expand All @@ -223,32 +283,57 @@ impl AsyncWrite for ComponentStdioWriter {
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let this = self.get_mut();
match this.state {
ComponentStdioWriterState::File => {
std::pin::Pin::new(&mut this.async_file).poll_shutdown(cx)
}
ComponentStdioWriterState::Follow(_) => {

match &mut this.inner {
ComponentStdioWriterInner::Inherit => {
std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx)
}
ComponentStdioWriterInner::Forward {
async_file, state, ..
} => match state {
ComponentStdioWriterState::File => std::pin::Pin::new(async_file).poll_shutdown(cx),
ComponentStdioWriterState::Follow(_) => {
std::pin::Pin::new(&mut tokio::io::stderr()).poll_flush(cx)
}
},
}
}
}

impl std::io::Write for ComponentStdioWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let written = self.sync_file.write(buf)?;
if self.follow {
std::io::stderr().write_all(&buf[..written])?;
spin_telemetry::log::app_log_to_tracing_event(buf);

match &mut self.inner {
ComponentStdioWriterInner::Inherit => {
std::io::stderr().write_all(buf)?;
Ok(buf.len())
}
ComponentStdioWriterInner::Forward {
sync_file, follow, ..
} => {
let written = sync_file.write(buf)?;
if *follow {
std::io::stderr().write_all(&buf[..written])?;
}
Ok(written)
}
}
Ok(written)
}

fn flush(&mut self) -> std::io::Result<()> {
self.sync_file.flush()?;
if self.follow {
std::io::stderr().flush()?;
match &mut self.inner {
ComponentStdioWriterInner::Inherit => std::io::stderr().flush(),
ComponentStdioWriterInner::Forward {
sync_file, follow, ..
} => {
sync_file.flush()?;
if *follow {
std::io::stderr().flush()?;
}
Ok(())
}
}
Ok(())
}
}

Expand Down
1 change: 1 addition & 0 deletions examples/spin-timer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fc1f835

Please sign in to comment.