diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index c3dc21e4..b1c004b0 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -67,13 +67,19 @@ impl BlockStreamsClientWrapperImpl { } } +#[cfg(not(test))] +pub use BlockStreamsHandlerImpl as BlockStreamsHandler; +#[cfg(test)] +pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler; + #[derive(Clone)] -pub struct BlockStreamsHandler { +pub struct BlockStreamsHandlerImpl { client: BlockStreamsClientWrapper, redis_client: RedisClient, } -impl BlockStreamsHandler { +#[cfg_attr(test, mockall::automock)] +impl BlockStreamsHandlerImpl { pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result { let channel = Channel::from_shared(block_streamer_url.to_string()) .context("Block Streamer URL is invalid")? @@ -368,6 +374,12 @@ mod tests { } } + impl Clone for MockBlockStreamsHandlerImpl { + fn clone(&self) -> Self { + Self::default() + } + } + #[tokio::test] async fn resumes_stopped_streams() { let config = IndexerConfig::default(); @@ -401,7 +413,7 @@ mod tests { .expect_get_last_published_block::() .returning(move |_| Ok(Some(last_published_block))); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -463,7 +475,7 @@ mod tests { .returning(|_| Ok(())) .once(); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -507,7 +519,7 @@ mod tests { let mock_redis = RedisClient::default(); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -551,7 +563,7 @@ mod tests { .returning(|_| Ok(())) .once(); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -617,7 +629,7 @@ mod tests { .expect_get_last_published_block::() .returning(move |_| Ok(Some(last_published_block))); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -672,7 +684,7 @@ mod tests { let mock_redis = RedisClient::default(); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; @@ -723,7 +735,7 @@ mod tests { .returning(|_| Ok(None)) .once(); - let handler = BlockStreamsHandler { + let handler = BlockStreamsHandlerImpl { client: mock_client, redis_client: mock_redis, }; diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index e0ea273c..4b50f38b 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -73,12 +73,18 @@ impl DataLayerClientWrapperImpl { } } +#[cfg(not(test))] +pub use DataLayerHandlerImpl as DataLayerHandler; +#[cfg(test)] +pub use MockDataLayerHandlerImpl as DataLayerHandler; + #[derive(Clone)] -pub struct DataLayerHandler { +pub struct DataLayerHandlerImpl { client: DataLayerClientWrapper, } -impl DataLayerHandler { +#[cfg_attr(test, mockall::automock)] +impl DataLayerHandlerImpl { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -249,6 +255,12 @@ mod tests { use mockall::predicate::*; + impl Clone for MockDataLayerHandlerImpl { + fn clone(&self) -> Self { + Self::default() + } + } + #[tokio::test] async fn provisions_data_layer() { let config = IndexerConfig::default(); @@ -290,7 +302,7 @@ mod tests { }) .once(); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; @@ -329,7 +341,7 @@ mod tests { }) .times(610); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; @@ -371,7 +383,7 @@ mod tests { }) .once(); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; @@ -423,7 +435,7 @@ mod tests { }) .once(); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; @@ -464,7 +476,7 @@ mod tests { }) .times(610); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; @@ -507,7 +519,7 @@ mod tests { }) .once(); - let handler = DataLayerHandler { + let handler = DataLayerHandlerImpl { client: mock_client, }; diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 4fc67a67..8b761e99 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -63,12 +63,18 @@ impl ExecutorsClientWrapperImpl { } } +#[cfg(not(test))] +pub use ExecutorsHandlerImpl as ExecutorsHandler; +#[cfg(test)] +pub use MockExecutorsHandlerImpl as ExecutorsHandler; + #[derive(Clone)] -pub struct ExecutorsHandler { +pub struct ExecutorsHandlerImpl { client: ExecutorsClientWrapper, } -impl ExecutorsHandler { +#[cfg_attr(test, mockall::automock)] +impl ExecutorsHandlerImpl { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -225,6 +231,12 @@ mod tests { } } + impl Clone for MockExecutorsHandlerImpl { + fn clone(&self) -> Self { + Self::default() + } + } + #[tokio::test] async fn resumes_stopped_executors() { let config = IndexerConfig::default(); @@ -255,7 +267,7 @@ mod tests { }) .once(); - let handler = ExecutorsHandler { + let handler = ExecutorsHandlerImpl { client: mock_client, }; @@ -308,7 +320,7 @@ mod tests { .returning(move |_| Ok(Response::new(executor.clone()))) .once(); - let handler = ExecutorsHandler { + let handler = ExecutorsHandlerImpl { client: mock_client, }; @@ -365,7 +377,7 @@ mod tests { .returning(move |_| Ok(Response::new(executor.clone()))) .once(); - let handler = ExecutorsHandler { + let handler = ExecutorsHandlerImpl { client: mock_client, }; @@ -408,7 +420,7 @@ mod tests { .with(always()) .returning(move |_| Ok(Response::new(executor.clone()))); - let handler = ExecutorsHandler { + let handler = ExecutorsHandlerImpl { client: mock_client, }; diff --git a/coordinator/src/indexer_config.rs b/coordinator/src/indexer_config.rs index a29eab2d..233cf852 100644 --- a/coordinator/src/indexer_config.rs +++ b/coordinator/src/indexer_config.rs @@ -40,7 +40,7 @@ impl Default for IndexerConfig { }, created_at_block_height: 1, updated_at_block_height: Some(2), - deleted_at_block_height: Some(3), + deleted_at_block_height: None, start_block: StartBlock::Height(100), } } diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index 11bb1cfa..b6715112 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -254,6 +254,79 @@ impl<'a> LifecycleManager<'a> { // LifecycleState::Deleted } + pub async fn handle_transitions(&self, first_iteration: bool) -> bool { + let config = match self + .registry + .fetch_indexer( + &self.initial_config.account_id, + &self.initial_config.function_name, + ) + .await + { + Ok(Some(config)) => config, + Ok(None) => { + warn!("No matching indexer config was found"); + return false; + } + Err(error) => { + warn!(?error, "Failed to fetch config"); + return false; + } + }; + + let mut state = match self.state_manager.get_state(&self.initial_config).await { + Ok(state) => state, + Err(error) => { + warn!(?error, "Failed to get state"); + return false; + } + }; + + if first_iteration { + info!("Initial lifecycle state: {:?}", state.lifecycle_state); + } + + let desired_lifecycle_state = match state.lifecycle_state { + LifecycleState::Initializing => self.handle_initializing(&config, &state).await, + LifecycleState::Running => self.handle_running(&config, &mut state).await, + LifecycleState::Suspending => self.handle_suspending(&config).await, + LifecycleState::Suspended => self.handle_suspended(&config, &state).await, + LifecycleState::Repairing => self.handle_repairing(&config, &state).await, + LifecycleState::Deleting => self.handle_deleting(&state).await, + LifecycleState::Deleted => LifecycleState::Deleted, + }; + + if desired_lifecycle_state != state.lifecycle_state { + info!( + "Transitioning lifecycle state: {:?} -> {:?}", + state.lifecycle_state, desired_lifecycle_state, + ); + } + + if desired_lifecycle_state == LifecycleState::Deleted { + return true; + } + + state.lifecycle_state = desired_lifecycle_state; + + loop { + match self + .state_manager + .set_state(&self.initial_config, state.clone()) + .await + { + Ok(_) => break, + Err(e) => { + warn!("Failed to set state: {:?}. Retrying...", e); + + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } + } + + false + } + #[tracing::instrument( name = "lifecycle_manager", skip(self), @@ -267,76 +340,823 @@ impl<'a> LifecycleManager<'a> { loop { tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; + let should_exit = self.handle_transitions(first_iteration).await; + + if should_exit { + break; + } + + first_iteration = false; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use mockall::predicate::*; + + mod initializing { + use super::*; + + #[tokio::test] + async fn transitions_to_running_on_provisioning_success() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); - let config = match self - .registry - .fetch_indexer( - &self.initial_config.account_id, - &self.initial_config.function_name, + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_ensure_provisioned() + .returning(|_| Ok(())); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Initializing, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Running + }), ) - .await - { - Ok(Some(config)) => config, - Ok(None) => { - warn!("No matching indexer config was found"); - continue; - } - Err(error) => { - warn!(?error, "Failed to fetch config"); - continue; - } - }; + .returning(|_, _| Ok(())); - let mut state = match self.state_manager.get_state(&self.initial_config).await { - Ok(state) => state, - Err(error) => { - warn!(?error, "Failed to get state"); - continue; - } - }; + let redis_client = RedisClient::default(); - if first_iteration { - info!("Initial lifecycle state: {:?}", state.lifecycle_state,); - first_iteration = false; - } + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); - let desired_lifecycle_state = match state.lifecycle_state { - LifecycleState::Initializing => self.handle_initializing(&config, &state).await, - LifecycleState::Running => self.handle_running(&config, &mut state).await, - LifecycleState::Suspending => self.handle_suspending(&config).await, - LifecycleState::Suspended => self.handle_suspended(&config, &state).await, - LifecycleState::Repairing => self.handle_repairing(&config, &state).await, - LifecycleState::Deleting => self.handle_deleting(&state).await, - LifecycleState::Deleted => LifecycleState::Deleted, - }; - - if desired_lifecycle_state != state.lifecycle_state { - info!( - "Transitioning lifecycle state: {:?} -> {:?}", - state.lifecycle_state, desired_lifecycle_state, - ); - } + lifecycle_manager.handle_transitions(true).await; + } - if desired_lifecycle_state == LifecycleState::Deleted { - break; - } + #[tokio::test] + async fn transitions_to_repairing_on_provisioning_failure() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); - state.lifecycle_state = desired_lifecycle_state; + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_ensure_provisioned() + .returning(|_| anyhow::bail!("failed")); - loop { - match self - .state_manager - .set_state(&self.initial_config, state.clone()) - .await - { - Ok(_) => break, - Err(e) => { - warn!("Failed to set state: {:?}. Retrying...", e); + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - } - } - } + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Initializing, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Repairing + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn transitions_to_deleting_on_delete() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + + let mut data_layer_handler = DataLayerHandler::default(); + data_layer_handler + .expect_ensure_provisioned() + .returning(|_| anyhow::bail!("failed")); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Initializing, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod running { + use super::*; + + #[tokio::test] + async fn transitions_to_deleting_on_delete() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn transitions_to_suspending_on_disabled() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: false, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn synchronises_streams_and_executors() { + let config = IndexerConfig::default(); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_synchronise() + .returning(|_, _| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_synchronise() + .returning(|_| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Running, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Running + && state.block_stream_synced_at == Some(2) + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod suspending { + use super::*; + + #[tokio::test] + async fn transitions_to_deleting_on_delete() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Suspending, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn stops_streams_and_executors() { + let config = IndexerConfig::default(); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_stop_if_needed() + .returning(|_, _| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_stop_if_needed() + .returning(|_, _| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Suspending, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Suspended + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod suspended { + use super::*; + + #[tokio::test] + async fn transitions_to_deleting_on_delete() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Suspended, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: false, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn transitions_to_running_on_enabled() { + let config = IndexerConfig::default(); + + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Suspended, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Running + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn transitions_to_suspended() { + let config = IndexerConfig::default(); + + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Suspended, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: false, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Suspended + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod repairing { + use super::*; + + #[tokio::test] + async fn transitions_to_deleting_on_delete() { + let config = IndexerConfig::default(); + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry.expect_fetch_indexer().returning(move |_, _| { + Ok(Some(IndexerConfig { + deleted_at_block_height: Some(3), + ..Default::default() + })) + }); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Repairing, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleting + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + + #[tokio::test] + async fn transitions_to_repairing() { + let config = IndexerConfig::default(); + + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Repairing, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Repairing + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod deleting { + use super::*; + + #[tokio::test] + async fn stops_streams_and_executors() { + let config = IndexerConfig::default(); + + let mut block_streams_handler = BlockStreamsHandler::default(); + block_streams_handler + .expect_stop_if_needed() + .returning(|_, _| Ok(())) + .once(); + + let mut executors_handler = ExecutorsHandler::default(); + executors_handler + .expect_stop_if_needed() + .returning(|_, _| Ok(())) + .once(); + + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Deleting, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleted + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.handle_transitions(true).await; + } + } + + mod deleted { + use super::*; + + #[tokio::test] + async fn exits() { + tokio::time::pause(); + + let config = IndexerConfig::default(); + + let block_streams_handler = BlockStreamsHandler::default(); + let executors_handler = ExecutorsHandler::default(); + let data_layer_handler = DataLayerHandler::default(); + + let mut registry = Registry::default(); + registry + .expect_fetch_indexer() + .returning(move |_, _| Ok(Some(IndexerConfig::default()))); + + let mut state_manager = IndexerStateManager::default(); + state_manager.expect_get_state().returning(|_| { + Ok(IndexerState { + lifecycle_state: LifecycleState::Deleted, + account_id: "near".parse().unwrap(), + function_name: "function_name".to_string(), + enabled: true, + block_stream_synced_at: None, + }) + }); + state_manager + .expect_set_state() + .with( + always(), + function(|state: &IndexerState| { + state.lifecycle_state == LifecycleState::Deleted + }), + ) + .returning(|_, _| Ok(())); + + let redis_client = RedisClient::default(); + + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &state_manager, + &redis_client, + ); + + lifecycle_manager.run().await; } } }