diff --git a/src/app.rs b/src/app.rs index d20489b1c18c1..b745d831e07c3 100644 --- a/src/app.rs +++ b/src/app.rs @@ -28,7 +28,7 @@ use crate::{ cli::{handle_config_errors, LogFormat, Opts, RootOpts}, config::{self, Config, ConfigPath}, heartbeat, - signal::{SignalHandler, SignalPair, SignalRx, SignalTo}, + signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo}, topology::{ self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController, }, @@ -50,8 +50,8 @@ use tokio::sync::broadcast::error::RecvError; pub struct ApplicationConfig { pub config_paths: Vec, pub topology: RunningTopology, - pub graceful_crash_sender: mpsc::UnboundedSender<()>, - pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>, + pub graceful_crash_sender: mpsc::UnboundedSender, + pub graceful_crash_receiver: mpsc::UnboundedReceiver, #[cfg(feature = "api")] pub api: config::api::Options, #[cfg(feature = "enterprise")] @@ -141,9 +141,12 @@ impl ApplicationConfig { Some(api_server) } - Err(e) => { - error!("An error occurred that Vector couldn't handle: {}.", e); - _ = self.graceful_crash_sender.send(()); + Err(error) => { + let error = error.to_string(); + error!("An error occurred that Vector couldn't handle: {}.", error); + _ = self + .graceful_crash_sender + .send(ShutdownError::ApiFailed { error }); None } } @@ -256,7 +259,7 @@ impl Application { pub struct StartedApplication { pub config_paths: Vec, - pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>, + pub graceful_crash_receiver: mpsc::UnboundedReceiver, pub signals: SignalPair, pub topology_controller: SharedTopologyController, pub openssl_legacy_provider: Option, @@ -283,42 +286,19 @@ impl StartedApplication { let signal = loop { tokio::select! { - signal = signal_rx.recv() => { - match signal { - Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => { - let mut topology_controller = topology_controller.lock().await; - let new_config = config_builder.build().map_err(handle_config_errors).ok(); - if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await { - break SignalTo::Shutdown; - } - } - Ok(SignalTo::ReloadFromDisk) => { - let mut topology_controller = topology_controller.lock().await; - - // Reload paths - if let Some(paths) = config::process_paths(&config_paths) { - topology_controller.config_paths = paths; - } - - // Reload config - let new_config = config::load_from_paths_with_provider_and_secrets(&topology_controller.config_paths, &mut signal_handler) - .await - .map_err(handle_config_errors).ok(); - - if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await { - break SignalTo::Shutdown; - } - }, - Err(RecvError::Lagged(amt)) => warn!("Overflow, dropped {} signals.", amt), - Err(RecvError::Closed) => break SignalTo::Shutdown, - Ok(signal) => break signal, - } - } + signal = signal_rx.recv() => if let Some(signal) = handle_signal( + signal, + &topology_controller, + &config_paths, + &mut signal_handler, + ).await { + break signal; + }, // Trigger graceful shutdown if a component crashed, or all sources have ended. - _ = graceful_crash.next() => break SignalTo::Shutdown, + error = graceful_crash.next() => break SignalTo::Shutdown(error), _ = TopologyController::sources_finished(topology_controller.clone()) => { info!("All sources have finished."); - break SignalTo::Shutdown + break SignalTo::Shutdown(None) } , else => unreachable!("Signal streams never end"), } @@ -333,6 +313,52 @@ impl StartedApplication { } } +async fn handle_signal( + signal: Result, + topology_controller: &SharedTopologyController, + config_paths: &[ConfigPath], + signal_handler: &mut SignalHandler, +) -> Option { + match signal { + Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => { + let mut topology_controller = topology_controller.lock().await; + let new_config = config_builder.build().map_err(handle_config_errors).ok(); + match topology_controller.reload(new_config).await { + ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))), + _ => None, + } + } + Ok(SignalTo::ReloadFromDisk) => { + let mut topology_controller = topology_controller.lock().await; + + // Reload paths + if let Some(paths) = config::process_paths(config_paths) { + topology_controller.config_paths = paths; + } + + // Reload config + let new_config = config::load_from_paths_with_provider_and_secrets( + &topology_controller.config_paths, + signal_handler, + ) + .await + .map_err(handle_config_errors) + .ok(); + + match topology_controller.reload(new_config).await { + ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))), + _ => None, + } + } + Err(RecvError::Lagged(amt)) => { + warn!("Overflow, dropped {} signals.", amt); + None + } + Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)), + Ok(signal) => Some(signal), + } +} + pub struct FinishedApplication { pub signal: SignalTo, pub signal_rx: SignalRx, @@ -344,7 +370,7 @@ impl FinishedApplication { pub async fn shutdown(self) -> ExitStatus { let FinishedApplication { signal, - mut signal_rx, + signal_rx, topology_controller, openssl_legacy_provider, } = self; @@ -357,51 +383,41 @@ impl FinishedApplication { .into_inner(); let status = match signal { - SignalTo::Shutdown => { - emit!(VectorStopped); - tokio::select! { - _ = topology_controller.stop() => ExitStatus::from_raw({ - #[cfg(windows)] - { - exitcode::OK as u32 - } - #[cfg(unix)] - exitcode::OK - }), // Graceful shutdown finished - _ = signal_rx.recv() => { - // It is highly unlikely that this event will exit from topology. - emit!(VectorQuit); - // Dropping the shutdown future will immediately shut the server down - ExitStatus::from_raw({ - #[cfg(windows)] - { - exitcode::UNAVAILABLE as u32 - } - #[cfg(unix)] - exitcode::OK - }) - } - - } - } - SignalTo::Quit => { - // It is highly unlikely that this event will exit from topology. - emit!(VectorQuit); - drop(topology_controller); - ExitStatus::from_raw({ - #[cfg(windows)] - { - exitcode::UNAVAILABLE as u32 - } - #[cfg(unix)] - exitcode::OK - }) - } + SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await, + SignalTo::Quit => Self::quit(), _ => unreachable!(), }; drop(openssl_legacy_provider); status } + + async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus { + emit!(VectorStopped); + tokio::select! { + _ = topology_controller.stop() => ExitStatus::from_raw({ + #[cfg(windows)] + { + exitcode::OK as u32 + } + #[cfg(unix)] + exitcode::OK + }), // Graceful shutdown finished + _ = signal_rx.recv() => Self::quit(), + } + } + + fn quit() -> ExitStatus { + // It is highly unlikely that this event will exit from topology. + emit!(VectorQuit); + ExitStatus::from_raw({ + #[cfg(windows)] + { + exitcode::UNAVAILABLE as u32 + } + #[cfg(unix)] + exitcode::OK + }) + } } pub fn init_global() { diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index 9095ddc6efa90..d983f0583a53a 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -134,7 +134,7 @@ async fn query_backend( loop { tokio::select! { biased; - Ok(signal::SignalTo::Shutdown | signal::SignalTo::Quit) = signal_rx.recv() => { + Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => { drop(command); return Err("Secret retrieval was interrupted.".into()); } diff --git a/src/signal.rs b/src/signal.rs index cb8666325f361..91a1e3515f798 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,9 +1,10 @@ #![allow(missing_docs)] +use snafu::Snafu; use tokio::{runtime::Runtime, sync::broadcast}; use tokio_stream::{Stream, StreamExt}; -use super::config::ConfigBuilder; +use super::config::{ComponentKey, ConfigBuilder}; pub type ShutdownTx = broadcast::Sender<()>; pub type SignalTx = broadcast::Sender; @@ -18,11 +19,27 @@ pub enum SignalTo { /// Signal to reload config from the filesystem. ReloadFromDisk, /// Signal to shutdown process. - Shutdown, + Shutdown(Option), /// Shutdown process immediately. Quit, } +#[derive(Clone, Debug, Snafu)] +pub enum ShutdownError { + // For future work: It would be nice if we could keep the actual errors in here, but + // `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky. + #[snafu(display("The API failed to start: {error}"))] + ApiFailed { error: String }, + #[snafu(display("Reload failed, and then failed to restore the previous config"))] + ReloadFailedToRestore, + #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))] + SourceAborted { key: ComponentKey, error: String }, + #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))] + TransformAborted { key: ComponentKey, error: String }, + #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))] + SinkAborted { key: ComponentKey, error: String }, +} + /// Convenience struct for app setup handling. pub struct SignalPair { pub handler: SignalHandler, @@ -153,11 +170,11 @@ fn os_signals(runtime: &Runtime) -> impl Stream { let signal = tokio::select! { _ = sigint.recv() => { info!(message = "Signal received.", signal = "SIGINT"); - SignalTo::Shutdown + SignalTo::Shutdown(None) }, _ = sigterm.recv() => { info!(message = "Signal received.", signal = "SIGTERM"); - SignalTo::Shutdown + SignalTo::Shutdown(None) } , _ = sigquit.recv() => { info!(message = "Signal received.", signal = "SIGQUIT"); @@ -181,7 +198,7 @@ fn os_signals(_: &Runtime) -> impl Stream { async_stream::stream! { loop { - let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown).await; + let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await; yield signal; } } diff --git a/src/sinks/datadog/traces/apm_stats/integration_tests.rs b/src/sinks/datadog/traces/apm_stats/integration_tests.rs index 187ad1b9d8150..4ca0207e30371 100644 --- a/src/sinks/datadog/traces/apm_stats/integration_tests.rs +++ b/src/sinks/datadog/traces/apm_stats/integration_tests.rs @@ -16,6 +16,7 @@ use tokio::time::{sleep, Duration}; use crate::{ config::ConfigBuilder, + signal::ShutdownError, sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig}, sources::datadog_agent::DatadogAgentConfig, test_util::{start_topology, trace_init}, @@ -322,8 +323,8 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) { async fn start_vector() -> ( RunningTopology, ( - tokio::sync::mpsc::UnboundedSender<()>, - tokio::sync::mpsc::UnboundedReceiver<()>, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, ), ) { let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port()); diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 9a0eccc5700c1..f06529ed77b4b 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -53,7 +53,7 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC loop { tokio::select! { biased; - Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break, + Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break, status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => { if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect { #[allow(clippy::print_stderr)] diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index 99f9f1b51badb..b87b7abf6a43d 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -41,6 +41,7 @@ use zstd::Decoder as ZstdDecoder; use crate::{ config::{Config, ConfigDiff, GenerateConfig}, + signal::ShutdownError, topology::{self, RunningTopology}, trace, }; @@ -683,8 +684,8 @@ pub async fn start_topology( ) -> ( RunningTopology, ( - tokio::sync::mpsc::UnboundedSender<()>, - tokio::sync::mpsc::UnboundedReceiver<()>, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, ), ) { config.healthchecks.set_require_healthy(require_healthy); diff --git a/src/topology/controller.rs b/src/topology/controller.rs index 9b64b7f050279..529c4b93d95db 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -16,7 +16,7 @@ use crate::internal_events::{ VectorConfigLoadError, VectorRecoveryError, VectorReloadError, VectorReloaded, }; -use crate::{config, topology::RunningTopology}; +use crate::{config, signal::ShutdownError, topology::RunningTopology}; #[derive(Clone, Debug)] pub struct SharedTopologyController(Arc>); @@ -54,12 +54,13 @@ impl std::fmt::Debug for TopologyController { } } +#[derive(Clone, Debug)] pub enum ReloadOutcome { NoConfig, MissingApiKey, Success, RolledBack, - FatalError, + FatalError(ShutdownError), } impl TopologyController { @@ -104,7 +105,6 @@ impl TopologyController { } } else if self.api_server.is_none() { use crate::internal_events::ApiStarted; - use crate::topology::ReloadOutcome::FatalError; use std::sync::atomic::AtomicBool; use tokio::runtime::Handle; @@ -124,9 +124,10 @@ impl TopologyController { Some(api_server) } - Err(e) => { - error!("An error occurred that Vector couldn't handle: {}.", e); - return FatalError; + Err(error) => { + let error = error.to_string(); + error!("An error occurred that Vector couldn't handle: {}.", error); + return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error }); } } } @@ -152,7 +153,7 @@ impl TopologyController { Err(()) => { emit!(VectorReloadError); emit!(VectorRecoveryError); - ReloadOutcome::FatalError + ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore) } } } diff --git a/src/topology/mod.rs b/src/topology/mod.rs index bb5299839d025..2b3991914cd8a 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -34,6 +34,7 @@ use vector_buffers::topology::channel::{BufferReceiverStream, BufferSender}; use crate::{ config::{ComponentKey, Config, ConfigDiff, Inputs, OutputId}, event::EventArray, + signal::ShutdownError, topology::{builder::Pieces, task::Task}, }; @@ -80,7 +81,10 @@ pub async fn start_validated( mut pieces: Pieces, ) -> Option<( RunningTopology, - (mpsc::UnboundedSender<()>, mpsc::UnboundedReceiver<()>), + ( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + ), )> { let (abort_tx, abort_rx) = mpsc::unbounded_channel(); @@ -151,7 +155,8 @@ pub(super) fn take_healthchecks( async fn handle_errors( task: impl Future, - abort_tx: mpsc::UnboundedSender<()>, + abort_tx: mpsc::UnboundedSender, + error: impl FnOnce(String) -> ShutdownError, ) -> TaskResult { AssertUnwindSafe(task) .catch_unwind() @@ -160,7 +165,7 @@ async fn handle_errors( .and_then(|res| res) .map_err(|e| { error!("An error occurred that Vector couldn't handle: {}.", e); - _ = abort_tx.send(()); + _ = abort_tx.send(error(e.to_string())); e }) } diff --git a/src/topology/running.rs b/src/topology/running.rs index 87fe3fe276f74..78cf93bab28d4 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -20,6 +20,7 @@ use crate::{ config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource}, event::EventArray, shutdown::SourceShutdownCoordinator, + signal::ShutdownError, spawn_named, topology::{ build_or_log_errors, builder, @@ -42,14 +43,14 @@ pub struct RunningTopology { shutdown_coordinator: SourceShutdownCoordinator, detach_triggers: HashMap, pub(crate) config: Config, - abort_tx: mpsc::UnboundedSender<()>, + abort_tx: mpsc::UnboundedSender, watch: (WatchTx, WatchRx), pub(crate) running: Arc, graceful_shutdown_duration: Option, } impl RunningTopology { - pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<()>) -> Self { + pub fn new(config: Config, abort_tx: mpsc::UnboundedSender) -> Self { Self { inputs: HashMap::new(), inputs_tap_metadata: HashMap::new(), @@ -855,7 +856,13 @@ impl RunningTopology { } let task_name = format!(">> {} ({})", task.typetag(), task.id()); - let task = handle_errors(task, self.abort_tx.clone()).instrument(task_span); + let task = { + let key = key.clone(); + handle_errors(task, self.abort_tx.clone(), |error| { + ShutdownError::SinkAborted { key, error } + }) + } + .instrument(task_span); let spawned = spawn_named(task, task_name.as_ref()); if let Some(previous) = self.tasks.insert(key.clone(), spawned) { drop(previous); // detach and forget @@ -892,7 +899,13 @@ impl RunningTopology { } let task_name = format!(">> {} ({}) >>", task.typetag(), task.id()); - let task = handle_errors(task, self.abort_tx.clone()).instrument(task_span); + let task = { + let key = key.clone(); + handle_errors(task, self.abort_tx.clone(), |error| { + ShutdownError::TransformAborted { key, error } + }) + } + .instrument(task_span); let spawned = spawn_named(task, task_name.as_ref()); if let Some(previous) = self.tasks.insert(key.clone(), spawned) { drop(previous); // detach and forget @@ -930,7 +943,13 @@ impl RunningTopology { } let task_name = format!("{} ({}) >>", task.typetag(), task.id()); - let task = handle_errors(task, self.abort_tx.clone()).instrument(task_span.clone()); + let task = { + let key = key.clone(); + handle_errors(task, self.abort_tx.clone(), |error| { + ShutdownError::SourceAborted { key, error } + }) + } + .instrument(task_span.clone()); let spawned = spawn_named(task, task_name.as_ref()); if let Some(previous) = self.tasks.insert(key.clone(), spawned) { drop(previous); // detach and forget @@ -941,7 +960,13 @@ impl RunningTopology { // Now spawn the actual source task. let source_task = new_pieces.source_tasks.remove(key).unwrap(); - let source_task = handle_errors(source_task, self.abort_tx.clone()).instrument(task_span); + let source_task = { + let key = key.clone(); + handle_errors(source_task, self.abort_tx.clone(), |error| { + ShutdownError::SourceAborted { key, error } + }) + } + .instrument(task_span); self.source_tasks .insert(key.clone(), spawn_named(source_task, task_name.as_ref())); } diff --git a/src/vector_windows.rs b/src/vector_windows.rs index 5ba3548ce5a25..7aac7cef587ff 100644 --- a/src/vector_windows.rs +++ b/src/vector_windows.rs @@ -379,7 +379,7 @@ fn run_service(_arguments: Vec) -> Result<()> { // Handle stop ServiceControl::Stop => { - while signal_tx.send(SignalTo::Shutdown).is_err() {} + while signal_tx.send(SignalTo::Shutdown(None)).is_err() {} ServiceControlHandlerResult::NoError }