Skip to content

Commit

Permalink
feat: Restart unhealthy executors
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 17, 2024
1 parent b8bdcae commit d18289b
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo;

use anyhow::Context;
use runner::runner_client::RunnerClient;
use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest};
use runner::{
ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest,
StopExecutorRequest,
};
use tonic::transport::channel::Channel;
use tonic::Request;

Expand Down Expand Up @@ -119,13 +122,36 @@ impl ExecutorsHandler {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
executor: ExecutorInfo,
) -> anyhow::Result<()> {
if let Some(health) = executor.health {
if !matches!(
health.execution_state.try_into(),
Ok(ExecutionState::Stalled)
) {
return Ok(());
}
}

tracing::info!("Restarting stalled executor");

self.stop(executor.executor_id).await?;
self.start(config).await?;

Ok(())
}

pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let executor = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(executor) = executor {
if executor.version == config.get_registry_version() {
self.ensure_healthy(config, executor).await?;
return Ok(());
}

Expand Down

0 comments on commit d18289b

Please sign in to comment.