Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(reload): restart api server based on topology #17958

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use async_graphql::{
Data, Request, Schema,
};
use async_graphql_warp::{graphql_protocol, GraphQLResponse, GraphQLWebSocket};
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use warp::{filters::BoxedFilter, http::Response, ws::Ws, Filter, Reply};

Expand All @@ -31,13 +32,13 @@ impl Server {
config: &config::Config,
watch_rx: topology::WatchRx,
running: Arc<AtomicBool>,
runtime: &tokio::runtime::Runtime,
handle: &Handle,
) -> crate::Result<Self> {
let routes = make_routes(config.api.playground, watch_rx, running);

let (_shutdown, rx) = oneshot::channel();
// warp uses `tokio::spawn` and so needs us to enter the runtime context.
let _guard = runtime.enter();
let _guard = handle.enter();
let (addr, server) = warp::serve(routes)
.try_bind_with_graceful_shutdown(
config.api.address.expect("No socket address"),
Expand All @@ -57,7 +58,7 @@ impl Server {
schema::components::update_config(config);

// Spawn the server in the background.
runtime.spawn(server);
handle.spawn(server);

Ok(Self { _shutdown, addr })
}
Expand Down
14 changes: 8 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::os::windows::process::ExitStatusExt;
use tokio::runtime::Handle;

pub static WORKER_THREADS: OnceNonZeroUsize = OnceNonZeroUsize::new();

Expand Down Expand Up @@ -122,13 +123,13 @@ impl ApplicationConfig {

/// Configure the API server, if applicable
#[cfg(feature = "api")]
pub fn setup_api(&self, runtime: &Runtime) -> Option<api::Server> {
pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
if self.api.enabled {
match api::Server::start(
self.topology.config(),
self.topology.watch(),
std::sync::Arc::clone(&self.topology.running),
runtime,
handle,
) {
Ok(api_server) => {
emit!(ApiStarted {
Expand Down Expand Up @@ -159,7 +160,8 @@ impl Application {
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare().and_then(|(runtime, app)| app.start(&runtime).map(|app| (runtime, app)))
Self::prepare()
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
}

pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
Expand Down Expand Up @@ -208,13 +210,13 @@ impl Application {
))
}

pub fn start(self, runtime: &Runtime) -> Result<StartedApplication, ExitCode> {
pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
// Any internal_logs sources will have grabbed a copy of the
// early buffer by this point and set up a subscriber.
crate::trace::stop_early_buffering();

emit!(VectorStarted);
runtime.spawn(heartbeat::heartbeat());
handle.spawn(heartbeat::heartbeat());

let Self {
require_healthy,
Expand All @@ -224,7 +226,7 @@ impl Application {

let topology_controller = SharedTopologyController::new(TopologyController {
#[cfg(feature = "api")]
api_server: config.setup_api(runtime),
api_server: config.setup_api(handle),
topology: config.topology,
config_paths: config.config_paths.clone(),
require_healthy,
Expand Down
38 changes: 38 additions & 0 deletions src/topology/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
#[cfg(feature = "enterprise")]
use futures_util::future::BoxFuture;
use futures_util::FutureExt as _;

use tokio::sync::{Mutex, MutexGuard};

#[cfg(feature = "api")]
Expand All @@ -14,6 +15,7 @@ use crate::config::enterprise::{
use crate::internal_events::{
VectorConfigLoadError, VectorRecoveryError, VectorReloadError, VectorReloaded,
};

use crate::{config, topology::RunningTopology};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -93,6 +95,42 @@ impl TopologyController {
}
}

// Start the api server or disable it, if necessary
#[cfg(feature = "api")]
if !new_config.api.enabled {
if let Some(server) = self.api_server.take() {
debug!("Dropping api server.");
drop(server)
}
} else if self.api_server.is_none() {
use crate::internal_events::ApiStarted;
use crate::topology::ReloadOutcome::FatalError;
use std::sync::atomic::AtomicBool;
use tokio::runtime::Handle;

debug!("Starting api server.");

self.api_server = match api::Server::start(
self.topology.config(),
self.topology.watch(),
Arc::<AtomicBool>::clone(&self.topology.running),
&Handle::current(),
) {
Ok(api_server) => {
emit!(ApiStarted {
addr: new_config.api.address.unwrap(),
playground: new_config.api.playground
});

Some(api_server)
}
Err(e) => {
error!("An error occurred that Vector couldn't handle: {}.", e);
return FatalError;
}
}
}

match self.topology.reload_config_and_respawn(new_config).await {
Ok(true) => {
#[cfg(feature = "api")]
Expand Down