Skip to content

Commit

Permalink
feat: Restart stalled Block Streams & Executors (#891)
Browse files Browse the repository at this point in the history
Coordinator will now continuously monitor Block Stream and Executor
health, and restart them if they are "stalled". The goal here is to
avoid the need for manually intervening on stopped processes.

Stalled means slightly different things for each:
- Block Stream - When it is able to, is not actively processing blocks
- Executors - An uncaught error was encountered, causing the thread to
exit
  • Loading branch information
morgsmccauley authored Jul 18, 2024
1 parent 29bde3c commit ee5845b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
33 changes: 32 additions & 1 deletion coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![cfg_attr(test, allow(dead_code))]

use std::time::{Duration, SystemTime};

pub use block_streamer::StreamInfo;

use anyhow::Context;
use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest,
ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest,
ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest,
};
use near_primitives::types::AccountId;
use registry_types::StartBlock;
Expand Down Expand Up @@ -235,6 +237,34 @@ impl BlockStreamsHandler {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
block_stream: &StreamInfo,
) -> anyhow::Result<()> {
if let Some(health) = block_stream.health.as_ref() {
let updated_at =
SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs);

let stale = updated_at.elapsed().unwrap_or_default() > Duration::from_secs(30);
let stalled = matches!(
health.processing_state.try_into(),
Ok(ProcessingState::Stalled)
);

if !stale && !stalled {
return Ok(());
}
}

tracing::info!("Restarting stalled block stream");

self.stop(block_stream.stream_id.clone()).await?;
self.resume_block_stream(config).await?;

Ok(())
}

pub async fn synchronise_block_stream(
&self,
config: &IndexerConfig,
Expand All @@ -246,6 +276,7 @@ impl BlockStreamsHandler {

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
self.ensure_healthy(config, &block_stream).await?;
return Ok(());
}

Expand Down
28 changes: 27 additions & 1 deletion coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ pub use runner::ExecutorInfo;

use anyhow::Context;
use runner::runner_client::RunnerClient;
use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest};
use runner::{
ExecutionState, GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest,
StopExecutorRequest,
};
use tonic::transport::channel::Channel;
use tonic::Request;

Expand Down Expand Up @@ -119,13 +122,36 @@ impl ExecutorsHandler {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
executor: ExecutorInfo,
) -> anyhow::Result<()> {
if let Some(health) = executor.health {
if !matches!(
health.execution_state.try_into(),
Ok(ExecutionState::Stalled)
) {
return Ok(());
}
}

tracing::info!("Restarting stalled executor");

self.stop(executor.executor_id).await?;
self.start(config).await?;

Ok(())
}

pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let executor = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(executor) = executor {
if executor.version == config.get_registry_version() {
self.ensure_healthy(config, executor).await?;
return Ok(());
}

Expand Down

0 comments on commit ee5845b

Please sign in to comment.