Skip to content

Commit

Permalink
feat: Restart unhealthy block streams
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 17, 2024
1 parent 606bc64 commit b8bdcae
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Context;
use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest,
ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest,
ListStreamsRequest, ProcessingState, StartStreamRequest, Status, StopStreamRequest,
};
use near_primitives::types::AccountId;
use registry_types::StartBlock;
Expand Down Expand Up @@ -235,6 +235,28 @@ impl BlockStreamsHandler {
Ok(())
}

async fn ensure_healthy(
&self,
config: &IndexerConfig,
block_stream: &StreamInfo,
) -> anyhow::Result<()> {
if let Some(health) = block_stream.health.as_ref() {
if !matches!(
health.processing_state.try_into(),
Ok(ProcessingState::Stalled)
) {
return Ok(());
}
}

tracing::info!("Restarting stalled block stream");

self.stop(block_stream.stream_id.clone()).await?;
self.resume_block_stream(config).await?;

Ok(())
}

pub async fn synchronise_block_stream(
&self,
config: &IndexerConfig,
Expand All @@ -246,6 +268,7 @@ impl BlockStreamsHandler {

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
self.ensure_healthy(config, &block_stream).await?;
return Ok(());
}

Expand Down

0 comments on commit b8bdcae

Please sign in to comment.