diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index b1c004b0..b549fdea 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -18,7 +18,19 @@ use tonic::transport::channel::Channel; use crate::indexer_config::IndexerConfig; use crate::redis::{KeyProvider, RedisClient}; -const RESTART_TIMEOUT_SECONDS: u64 = 600; +#[derive(Debug, PartialEq)] +pub enum BlockStreamStatus { + /// Block Stream is running as expected + Active, + /// Existing Block Stream is in an unhealthy state + Unhealthy, + /// Existing Block Stream is not running + Inactive, + /// Block Stream is not synchronized with the latest config + Outdated, + /// Block Stream has not been encountered before + NotStarted, +} #[cfg(not(test))] use BlockStreamsClientWrapperImpl as BlockStreamsClientWrapper; @@ -191,7 +203,10 @@ impl BlockStreamsHandlerImpl { Ok(()) } - async fn reconfigure(&self, config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn reconfigure(&self, config: &IndexerConfig) -> anyhow::Result<()> { + self.stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await?; + if matches!( config.start_block, StartBlock::Latest | StartBlock::Height(..) @@ -216,7 +231,7 @@ impl BlockStreamsHandlerImpl { Ok(()) } - async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { let height = match config.start_block { StartBlock::Height(height) => height, StartBlock::Latest => config.get_registry_version(), @@ -254,7 +269,7 @@ impl BlockStreamsHandlerImpl { Ok(height) } - async fn resume(&self, config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn resume(&self, config: &IndexerConfig) -> anyhow::Result<()> { let height = self.get_continuation_block_height(config).await?; tracing::info!(height, "Resuming block stream"); @@ -264,11 +279,7 @@ impl BlockStreamsHandlerImpl { Ok(()) } - async fn ensure_healthy( - &self, - config: &IndexerConfig, - block_stream: &StreamInfo, - ) -> anyhow::Result<()> { + fn is_healthy(&self, block_stream: &StreamInfo) -> bool { if let Some(health) = block_stream.health.as_ref() { let updated_at = SystemTime::UNIX_EPOCH + Duration::from_secs(health.updated_at_timestamp_secs); @@ -280,84 +291,67 @@ impl BlockStreamsHandlerImpl { ); if !stale && !stalled { - return Ok(()); - } else { - tracing::info!( - stale, - stalled, - "Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds" - ); + return true; } - } else { - tracing::info!( - "Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds" - ); } - self.stop(block_stream.stream_id.clone()).await?; - tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; - let height = self.get_continuation_block_height(config).await?; - self.start(height, config).await?; + false + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(block_stream) = self.get(account_id, function_name).await? { + tracing::info!("Stopping block stream"); + + self.stop(block_stream.stream_id).await?; + } Ok(()) } - pub async fn synchronise( + pub async fn get_status( &self, config: &IndexerConfig, previous_sync_version: Option, - ) -> anyhow::Result<()> { - let block_stream = self + ) -> anyhow::Result { + if let Some(block_stream) = self .get(config.account_id.clone(), config.function_name.clone()) - .await?; - - if let Some(block_stream) = block_stream { - if block_stream.version == config.get_registry_version() { - self.ensure_healthy(config, &block_stream).await?; - return Ok(()); + .await? + { + if block_stream.version != config.get_registry_version() { + return Ok(BlockStreamStatus::Outdated); } - tracing::info!( - previous_version = block_stream.version, - "Stopping outdated block stream" - ); - - self.stop(block_stream.stream_id.clone()).await?; - - self.reconfigure(config).await?; + if !self.is_healthy(&block_stream) { + return Ok(BlockStreamStatus::Unhealthy); + } - return Ok(()); + return Ok(BlockStreamStatus::Active); } if previous_sync_version.is_none() { - self.start_new_block_stream(config).await?; - - return Ok(()); + return Ok(BlockStreamStatus::NotStarted); } if previous_sync_version.unwrap() != config.get_registry_version() { - self.reconfigure(config).await?; - - return Ok(()); + return Ok(BlockStreamStatus::Outdated); } - self.resume(config).await?; - - Ok(()) + Ok(BlockStreamStatus::Inactive) } - pub async fn stop_if_needed( - &self, - account_id: AccountId, - function_name: String, - ) -> anyhow::Result<()> { - if let Some(block_stream) = self.get(account_id, function_name).await? { - tracing::info!("Stopping block stream"); - - self.stop(block_stream.stream_id).await?; + pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if let Some(block_stream) = self + .get(config.account_id.clone(), config.function_name.clone()) + .await? + { + self.stop(block_stream.stream_id.clone()).await?; } - Ok(()) + self.resume(config).await } } @@ -381,18 +375,84 @@ mod tests { } #[tokio::test] - async fn resumes_stopped_streams() { + async fn returns_stream_status() { + let config = IndexerConfig::default(); + let test_cases = [ + ( + Some(StreamInfo { + version: config.get_registry_version(), + health: Some(block_streamer::Health { + updated_at_timestamp_secs: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + processing_state: ProcessingState::Running.into(), + }), + ..Default::default() + }), + Some(config.get_registry_version()), + BlockStreamStatus::Active, + ), + ( + None, + Some(config.get_registry_version()), + BlockStreamStatus::Inactive, + ), + ( + Some(StreamInfo { + version: config.get_registry_version() - 1, + ..Default::default() + }), + Some(config.get_registry_version()), + BlockStreamStatus::Outdated, + ), + ( + Some(StreamInfo { + version: config.get_registry_version(), + health: None, + ..Default::default() + }), + Some(config.get_registry_version()), + BlockStreamStatus::Unhealthy, + ), + (None, None, BlockStreamStatus::NotStarted), + ]; + + for (stream, previous_sync_version, expected) in test_cases { + let mut mock_client = BlockStreamsClientWrapper::default(); + mock_client + .expect_get_stream::() + .returning(move |_| { + if let Some(stream) = stream.clone() { + Ok(Response::new(stream)) + } else { + Err(tonic::Status::not_found("not found")) + } + }); + + let mock_redis = RedisClient::default(); + + let handler = BlockStreamsHandlerImpl { + client: mock_client, + redis_client: mock_redis, + }; + + assert_eq!( + expected, + handler + .get_status(&config, previous_sync_version) + .await + .unwrap() + ); + } + } + + #[tokio::test] + async fn resumes_streams() { let config = IndexerConfig::default(); let last_published_block = 10; let mut mock_client = BlockStreamsClientWrapper::default(); - mock_client - .expect_get_stream::() - .with(eq(GetStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| Err(tonic::Status::not_found("not found"))); mock_client .expect_start_stream::() .with(eq(StartStreamRequest { @@ -406,26 +466,25 @@ mod tests { start_block_height: last_published_block + 1, version: config.get_registry_version(), })) - .returning(|_| Ok(Response::new(StartStreamResponse::default()))); + .returning(|_| Ok(Response::new(StartStreamResponse::default()))) + .once(); let mut mock_redis = RedisClient::default(); mock_redis .expect_get_last_published_block::() - .returning(move |_| Ok(Some(last_published_block))); + .returning(move |_| Ok(Some(last_published_block))) + .once(); let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; - handler - .synchronise(&config, Some(config.get_registry_version())) - .await - .unwrap(); + handler.resume(&config).await.unwrap(); } #[tokio::test] - async fn reconfigures_outdated_streams() { + async fn reconfigures_streams() { let config = IndexerConfig::default(); let existing_stream = StreamInfo { @@ -480,10 +539,7 @@ mod tests { redis_client: mock_redis, }; - handler - .synchronise(&config, Some(config.get_registry_version())) - .await - .unwrap(); + handler.reconfigure(&config).await.unwrap(); } #[tokio::test] @@ -491,13 +547,6 @@ mod tests { let config = IndexerConfig::default(); let mut mock_client = BlockStreamsClientWrapper::default(); - mock_client - .expect_get_stream::() - .with(eq(GetStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| Err(tonic::Status::not_found("not found"))); mock_client .expect_start_stream::() .with(eq(StartStreamRequest { @@ -524,62 +573,14 @@ mod tests { redis_client: mock_redis, }; - handler.synchronise(&config, None).await.unwrap(); - } - - #[tokio::test] - async fn reconfigures_outdated_and_stopped_streams() { - let config = IndexerConfig { - start_block: StartBlock::Latest, - ..Default::default() - }; - - let mut mock_client = BlockStreamsClientWrapper::default(); - mock_client - .expect_get_stream::() - .with(eq(GetStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| Err(tonic::Status::not_found("not found"))); - mock_client - .expect_start_stream::() - .with(eq(StartStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - redis_stream: config.get_redis_stream_key(), - rule: Some(Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any.into(), - })), - start_block_height: config.get_registry_version(), - version: config.get_registry_version(), - })) - .returning(|_| Ok(Response::new(StartStreamResponse::default()))); - - let mut mock_redis = RedisClient::default(); - mock_redis - .expect_clear_block_stream::() - .returning(|_| Ok(())) - .once(); - - let handler = BlockStreamsHandlerImpl { - client: mock_client, - redis_client: mock_redis, - }; - - handler - .synchronise(&config, Some(config.get_registry_version() - 1)) - .await - .unwrap(); + handler.start_new_block_stream(&config).await.unwrap(); } #[tokio::test] - async fn restarts_unhealthy_streams() { + async fn unhealthy_stream() { tokio::time::pause(); let config = IndexerConfig::default(); - let last_published_block = 10; let existing_stream = StreamInfo { account_id: config.account_id.to_string(), @@ -595,53 +596,19 @@ mod tests { }), }; - let mut mock_client = BlockStreamsClientWrapper::default(); - mock_client - .expect_stop_stream::() - .with(eq(StopStreamRequest { - stream_id: existing_stream.stream_id.clone(), - })) - .returning(|_| Ok(Response::new(StopStreamResponse::default()))); - mock_client - .expect_get_stream::() - .with(eq(GetStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(move |_| Ok(Response::new(existing_stream.clone()))); - mock_client - .expect_start_stream::() - .with(eq(StartStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - redis_stream: config.get_redis_stream_key(), - rule: Some(Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "queryapi.dataplatform.near".to_string(), - status: Status::Any.into(), - })), - start_block_height: last_published_block + 1, - version: config.get_registry_version(), - })) - .returning(|_| Ok(Response::new(StartStreamResponse::default()))); - - let mut mock_redis = RedisClient::default(); - mock_redis - .expect_get_last_published_block::() - .returning(move |_| Ok(Some(last_published_block))); + let mock_client = BlockStreamsClientWrapper::default(); + let mock_redis = RedisClient::default(); let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; - handler - .synchronise(&config, Some(config.get_registry_version() - 1)) - .await - .unwrap(); + assert!(!handler.is_healthy(&existing_stream)); } #[tokio::test] - async fn ignores_healthy_streams() { + async fn healthy_streams() { tokio::time::pause(); let config = IndexerConfig::default(); @@ -667,21 +634,7 @@ mod tests { }), }; - let mut mock_client = BlockStreamsClientWrapper::default(); - mock_client - .expect_get_stream::() - .with(eq(GetStreamRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(move |_| Ok(Response::new(existing_stream.clone()))); - mock_client - .expect_stop_stream::() - .never(); - mock_client - .expect_start_stream::() - .never(); - + let mock_client = BlockStreamsClientWrapper::default(); let mock_redis = RedisClient::default(); let handler = BlockStreamsHandlerImpl { @@ -689,10 +642,7 @@ mod tests { redis_client: mock_redis, }; - handler - .synchronise(&config, Some(config.get_registry_version())) - .await - .unwrap(); + assert!(handler.is_healthy(&existing_stream)); } } @@ -709,6 +659,10 @@ mod tests { }; let mut mock_client = BlockStreamsClientWrapper::default(); + mock_client + .expect_get_stream::() + .returning(|_| Err(tonic::Status::not_found("not found"))) + .times(3); mock_client .expect_start_stream::() .with(always()) diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 8b761e99..55133f76 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -14,7 +14,17 @@ use tonic::transport::channel::Channel; use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; -const RESTART_TIMEOUT_SECONDS: u64 = 600; +#[derive(Debug, PartialEq)] +pub enum ExecutorStatus { + /// Executor is running as expected + Active, + /// Executor is in an unhealthy state + Unhealthy, + /// Executor is not running + Inactive, + /// Executor is not synchronized with the latest config + Outdated, +} #[cfg(not(test))] use ExecutorsClientWrapperImpl as ExecutorsClientWrapper; @@ -148,56 +158,40 @@ impl ExecutorsHandlerImpl { Ok(()) } - async fn ensure_healthy( - &self, - config: &IndexerConfig, - executor: ExecutorInfo, - ) -> anyhow::Result<()> { + fn is_healthy(&self, executor: ExecutorInfo) -> bool { if let Some(health) = executor.health { - if !matches!( + return !matches!( health.execution_state.try_into(), Ok(ExecutionState::Stalled) - ) { - return Ok(()); - } + ); } - tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds"); - - self.stop(executor.executor_id).await?; - tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; - self.start(config).await?; - - Ok(()) + false } - pub async fn synchronise(&self, config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn get_status(&self, config: &IndexerConfig) -> anyhow::Result { let executor = self .get(config.account_id.clone(), config.function_name.clone()) .await?; if let Some(executor) = executor { - if executor.version == config.get_registry_version() { - self.ensure_healthy(config, executor).await?; - return Ok(()); + if executor.version != config.get_registry_version() { + return Ok(ExecutorStatus::Outdated); } - tracing::info!( - account_id = config.account_id.as_str(), - function_name = config.function_name, - version = executor.version, - "Stopping outdated executor" - ); + if !self.is_healthy(executor) { + return Ok(ExecutorStatus::Unhealthy); + } - self.stop(executor.executor_id).await?; + return Ok(ExecutorStatus::Active); } - tracing::info!( - account_id = config.account_id.as_str(), - function_name = config.function_name, - version = config.get_registry_version(), - "Starting executor" - ); + Ok(ExecutorStatus::Inactive) + } + + pub async fn restart(&self, config: &IndexerConfig) -> anyhow::Result<()> { + self.stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await?; self.start(config).await?; @@ -238,18 +232,63 @@ mod tests { } #[tokio::test] - async fn resumes_stopped_executors() { + async fn returns_executor_status() { + let config = IndexerConfig::default(); + let test_cases = [ + ( + Some(ExecutorInfo { + version: config.get_registry_version(), + health: None, + ..Default::default() + }), + ExecutorStatus::Unhealthy, + ), + (None, ExecutorStatus::Inactive), + ( + Some(ExecutorInfo { + version: config.get_registry_version() - 1, + ..Default::default() + }), + ExecutorStatus::Outdated, + ), + ( + Some(ExecutorInfo { + version: config.get_registry_version(), + health: Some(runner::Health { + execution_state: runner::ExecutionState::Running.into(), + }), + ..Default::default() + }), + ExecutorStatus::Active, + ), + ]; + + for (executor, expected_status) in test_cases { + let mut mock_client = ExecutorsClientWrapper::default(); + mock_client + .expect_get_executor::() + .with(always()) + .returning(move |_| { + if let Some(executor) = executor.clone() { + Ok(Response::new(executor)) + } else { + Err(tonic::Status::not_found("not found")) + } + }); + + let handler = ExecutorsHandlerImpl { + client: mock_client, + }; + + assert_eq!(handler.get_status(&config).await.unwrap(), expected_status); + } + } + + #[tokio::test] + async fn starts_executors() { let config = IndexerConfig::default(); let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_get_executor::() - .with(eq(GetExecutorRequest { - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| Err(tonic::Status::not_found("not found"))) - .once(); mock_client .expect_start_executor::() .with(eq(StartExecutorRequest { @@ -271,11 +310,11 @@ mod tests { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + handler.start(&config).await.unwrap() } #[tokio::test] - async fn reconfigures_outdated_executors() { + async fn restarts_executors() { let config = IndexerConfig::default(); let executor = ExecutorInfo { @@ -324,11 +363,11 @@ mod tests { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + handler.restart(&config).await.unwrap() } #[tokio::test] - async fn restarts_unhealthy_executors() { + async fn unhealthy_executor() { tokio::time::pause(); let config = IndexerConfig::default(); @@ -343,49 +382,17 @@ mod tests { }), }; - let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_stop_executor::() - .with(eq(StopExecutorRequest { - executor_id: executor.executor_id.clone(), - })) - .returning(|_| { - Ok(Response::new(StopExecutorResponse { - executor_id: "executor_id".to_string(), - })) - }) - .once(); - mock_client - .expect_start_executor::() - .with(eq(StartExecutorRequest { - code: config.code.clone(), - schema: config.schema.clone(), - redis_stream: config.get_redis_stream_key(), - version: config.get_registry_version(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - })) - .returning(|_| { - Ok(tonic::Response::new(StartExecutorResponse { - executor_id: "executor_id".to_string(), - })) - }) - .once(); - mock_client - .expect_get_executor::() - .with(always()) - .returning(move |_| Ok(Response::new(executor.clone()))) - .once(); + let mock_client = ExecutorsClientWrapper::default(); let handler = ExecutorsHandlerImpl { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + assert!(!handler.is_healthy(executor)); } #[tokio::test] - async fn ignores_healthy_executors() { + async fn healthy_executors() { tokio::time::pause(); let config = IndexerConfig::default(); @@ -408,23 +415,13 @@ mod tests { }), }; - let mut mock_client = ExecutorsClientWrapper::default(); - mock_client - .expect_stop_executor::() - .never(); - mock_client - .expect_start_executor::() - .never(); - mock_client - .expect_get_executor::() - .with(always()) - .returning(move |_| Ok(Response::new(executor.clone()))); + let mock_client = ExecutorsClientWrapper::default(); let handler = ExecutorsHandlerImpl { client: mock_client, }; - handler.synchronise(&config).await.unwrap() + assert!(handler.is_healthy(executor)); } } } diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index b6715112..3d29967e 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -1,14 +1,15 @@ use tracing::{info, warn}; -use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::block_streams::{BlockStreamStatus, BlockStreamsHandler}; use crate::handlers::data_layer::DataLayerHandler; -use crate::handlers::executors::ExecutorsHandler; +use crate::handlers::executors::{ExecutorStatus, ExecutorsHandler}; use crate::indexer_config::IndexerConfig; use crate::indexer_state::{IndexerState, IndexerStateManager}; use crate::redis::{KeyProvider, RedisClient}; use crate::registry::Registry; const LOOP_THROTTLE_MS: u64 = 1000; +const RESTART_TIMEOUT_SECONDS: u64 = 600; /// Represents the different lifecycle states of an Indexer #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -124,18 +125,55 @@ impl<'a> LifecycleManager<'a> { return LifecycleState::Suspending; } - if let Err(error) = self + let stream_status = match self .block_streams_handler - .synchronise(config, state.block_stream_synced_at) + .get_status(config, state.block_stream_synced_at) .await { + Ok(status) => status, + Err(error) => { + warn!(?error, "Failed to get block stream status"); + return LifecycleState::Running; + } + }; + + if let Err(error) = match stream_status { + BlockStreamStatus::Active => Ok(()), + BlockStreamStatus::Inactive => self.block_streams_handler.resume(config).await, + BlockStreamStatus::Outdated => self.block_streams_handler.reconfigure(config).await, + BlockStreamStatus::Unhealthy => { + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; + self.block_streams_handler.restart(config).await + } + BlockStreamStatus::NotStarted => { + self.block_streams_handler + .start_new_block_stream(config) + .await + } + } { warn!(?error, "Failed to synchronise block stream, retrying..."); return LifecycleState::Running; } state.block_stream_synced_at = Some(config.get_registry_version()); - if let Err(error) = self.executors_handler.synchronise(config).await { + let executor_status = match self.executors_handler.get_status(config).await { + Ok(status) => status, + Err(error) => { + warn!(?error, "Failed to synchronise executor"); + return LifecycleState::Running; + } + }; + + if let Err(error) = match executor_status { + ExecutorStatus::Active => Ok(()), + ExecutorStatus::Inactive => self.executors_handler.start(config).await, + ExecutorStatus::Outdated => self.executors_handler.restart(config).await, + ExecutorStatus::Unhealthy => { + tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await; + self.executors_handler.restart(config).await + } + } { warn!(?error, "Failed to synchronise executor, retrying..."); return LifecycleState::Running; } @@ -621,53 +659,162 @@ mod tests { } #[tokio::test] - async fn synchronises_streams_and_executors() { + async fn ignores_active_stream() { let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; let mut block_streams_handler = BlockStreamsHandler::default(); block_streams_handler - .expect_synchronise() - .returning(|_, _| Ok(())) + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn restarts_unhealthy_stream() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Unhealthy)); + block_streams_handler + .expect_restart() + .returning(|_| Ok(())) .once(); let mut executors_handler = ExecutorsHandler::default(); executors_handler - .expect_synchronise() + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn resumes_inactive_streams() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Inactive)); + block_streams_handler + .expect_resume() .returning(|_| Ok(())) .once(); + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); - let mut registry = Registry::default(); - registry - .expect_fetch_indexer() - .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_get_state().returning(|_| { - Ok(IndexerState { - lifecycle_state: LifecycleState::Running, - account_id: "near".parse().unwrap(), - function_name: "function_name".to_string(), - enabled: true, - block_stream_synced_at: None, - }) - }); - state_manager - .expect_set_state() - .with( - always(), - function(|state: &IndexerState| { - state.lifecycle_state == LifecycleState::Running - && state.block_stream_synced_at == Some(2) - }), - ) - .returning(|_, _| Ok(())); + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn reconfigures_outdated_streams() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Outdated)); + block_streams_handler + .expect_reconfigure() + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); let redis_client = RedisClient::default(); let lifecycle_manager = LifecycleManager::new( - config, + config.clone(), &block_streams_handler, &executors_handler, &data_layer_handler, @@ -676,7 +823,218 @@ mod tests { &redis_client, ); - lifecycle_manager.handle_transitions(true).await; + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn starts_new_streams() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::NotStarted)); + block_streams_handler + .expect_start_new_block_stream() + .returning(|_| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn ignores_active_executors() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Active)); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn starts_inactive_executors() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Inactive)); + executors_handler + .expect_start() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn restarts_unhealthy_executor() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Unhealthy)); + executors_handler + .expect_restart() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; + } + + #[tokio::test] + async fn restarts_outdated_executor() { + let config = IndexerConfig::default(); + let mut state = IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: config.account_id.clone(), + function_name: config.function_name.clone(), + enabled: true, + block_stream_synced_at: None, + }; + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_get_status() + .returning(|_, _| Ok(BlockStreamStatus::Active)); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_get_status() + .returning(|_| Ok(ExecutorStatus::Outdated)); + executors_handler + .expect_restart() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + let state_manager = IndexerStateManager::default(); + let registry = Registry::default(); + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config.clone(), + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_running(&config, &mut state).await; } }