diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 5d1e0517a..92d3a2c90 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -12,6 +12,17 @@ service BlockStreamer { // Lists all current BlockStream processes rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); + + // Get info for an existing BlockStream process + rpc GetStream (GetStreamRequest) returns (StreamInfo); +} + +// Request message for getting a BlockStream +message GetStreamRequest { + // Account ID which the indexer is defined under + string account_id = 1; + // Name of the indexer + string function_name = 2; } // Request message for starting a BlockStream diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 94ccb22aa..88d426b9c 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -59,6 +59,38 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] + async fn get_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + + let stream_entry = lock.iter().find(|(_, block_stream)| { + block_stream.indexer_config.account_id == request.account_id + && block_stream.indexer_config.function_name == request.function_name + }); + + if let Some((stream_id, stream)) = stream_entry { + Ok(Response::new(StreamInfo { + stream_id: stream_id.to_string(), + account_id: stream.indexer_config.account_id.to_string(), + function_name: stream.indexer_config.function_name.to_string(), + version: stream.version, + })) + } else { + Err(Status::not_found(format!( + "Block Stream for account {} and name {} does not exist", + request.account_id, request.function_name + ))) + } + } + #[tracing::instrument(skip(self))] async fn start_stream( &self, @@ -171,7 +203,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, _request: Request, ) -> Result, Status> { - let lock = self.block_streams.lock().unwrap(); + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + let block_streams: Vec = lock .values() .map(|block_stream| StreamInfo { @@ -234,6 +270,63 @@ mod tests { ) } + #[tokio::test] + async fn get_existing_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + let stream = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await + .unwrap(); + + assert_eq!( + stream.into_inner().stream_id, + "16210176318434468568".to_string() + ); + } + + #[tokio::test] + async fn get_non_existant_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + let stream_response = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await; + + assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound); + } + #[tokio::test] async fn starts_a_block_stream() { let block_streamer_service = create_block_streamer_service(); diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index f88a412d5..bc34c000b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -5,34 +5,36 @@ pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ - start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, + start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, + ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, }; +use near_primitives::types::AccountId; +use registry_types::StartBlock; use tonic::transport::channel::Channel; use tonic::Request; use crate::indexer_config::IndexerConfig; -use crate::redis::KeyProvider; +use crate::redis::{KeyProvider, RedisClient}; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use BlockStreamsHandlerImpl as BlockStreamsHandler; -#[cfg(test)] -pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler; - -pub struct BlockStreamsHandlerImpl { +#[derive(Clone)] +pub struct BlockStreamsHandler { client: BlockStreamerClient, + redis_client: RedisClient, } #[cfg_attr(test, mockall::automock)] -impl BlockStreamsHandlerImpl { - pub fn connect(block_streamer_url: &str) -> anyhow::Result { +impl BlockStreamsHandler { + pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result { let channel = Channel::from_shared(block_streamer_url.to_string()) .context("Block Streamer URL is invalid")? .connect_lazy(); let client = BlockStreamerClient::new(channel); - Ok(Self { client }) + Ok(Self { + client, + redis_client, + }) } pub async fn list(&self) -> anyhow::Result> { @@ -79,6 +81,26 @@ impl BlockStreamsHandlerImpl { .into() } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetStreamRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self.client.clone().get_stream(Request::new(request)).await { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get stream for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start( &self, start_block_height: u64, @@ -139,4 +161,134 @@ impl BlockStreamsHandlerImpl { Ok(()) } + + async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if matches!( + config.start_block, + StartBlock::Latest | StartBlock::Height(..) + ) { + self.redis_client.clear_block_stream(config).await?; + } + + let height = match config.start_block { + StartBlock::Latest => config.get_registry_version(), + StartBlock::Height(height) => height, + StartBlock::Continue => self.get_continuation_block_height(config).await?, + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await?; + + Ok(()) + } + + async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = match config.start_block { + StartBlock::Height(height) => height, + StartBlock::Latest => config.get_registry_version(), + StartBlock::Continue => { + tracing::warn!( + "Attempted to start new Block Stream with CONTINUE, using LATEST instead" + ); + config.get_registry_version() + } + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await + } + + async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result { + let height = self + .redis_client + .get_last_published_block(config) + .await? + .map(|height| height + 1) + .unwrap_or_else(|| { + tracing::warn!( + "Failed to get continuation block height, using registry version instead" + ); + + config.get_registry_version() + }); + + Ok(height) + } + + async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = self.get_continuation_block_height(config).await?; + + tracing::info!(height, "Resuming block stream"); + + self.start(height, config).await?; + + Ok(()) + } + + pub async fn synchronise_block_stream( + &self, + config: &IndexerConfig, + previous_sync_version: Option, + ) -> anyhow::Result<()> { + let block_stream = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(block_stream) = block_stream { + if block_stream.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = block_stream.version, + "Stopping outdated block stream" + ); + + self.stop(block_stream.stream_id.clone()).await?; + + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.is_none() { + self.start_new_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.unwrap() != config.get_registry_version() { + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + self.resume_block_stream(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(block_stream) = self.get(account_id, function_name).await? { + tracing::info!("Stopping block stream"); + + self.stop(block_stream.stream_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 68537b3c7..3e2df54dc 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -8,23 +8,18 @@ use anyhow::Context; use runner::data_layer::data_layer_client::DataLayerClient; use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest}; use tonic::transport::channel::Channel; -use tonic::Request; +use tonic::{Request, Status}; use crate::indexer_config::IndexerConfig; -#[cfg(not(test))] -pub use DataLayerHandlerImpl as DataLayerHandler; -#[cfg(test)] -pub use MockDataLayerHandlerImpl as DataLayerHandler; - type TaskId = String; -pub struct DataLayerHandlerImpl { +#[derive(Clone)] +pub struct DataLayerHandler { client: DataLayerClient, } -#[cfg_attr(test, mockall::automock)] -impl DataLayerHandlerImpl { +impl DataLayerHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -37,7 +32,7 @@ impl DataLayerHandlerImpl { pub async fn start_provisioning_task( &self, indexer_config: &IndexerConfig, - ) -> anyhow::Result { + ) -> Result { let request = ProvisionRequest { account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.clone(), @@ -98,4 +93,81 @@ impl DataLayerHandlerImpl { Ok(status) } + + pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); + + let start_task_result = self.start_provisioning_task(indexer_config).await; + + if let Err(error) = start_task_result { + // Already provisioned + if error.code() == tonic::Code::FailedPrecondition { + return Ok(()); + } + + return Err(error.into()); + } + + let task_id = start_task_result.unwrap(); + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?indexer_config.account_id, + ?indexer_config.function_name, + "Still waiting for provisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } + + pub async fn ensure_deprovisioned( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + tracing::info!(?account_id, ?function_name, "Deprovisioning data layer"); + + let task_id = self + .start_deprovisioning_task(account_id.clone(), function_name.clone()) + .await?; + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?account_id, + ?function_name, + "Still waiting for deprovisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 686164048..01c9cc4be 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -1,10 +1,11 @@ #![cfg_attr(test, allow(dead_code))] +use near_primitives::types::AccountId; pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; use tonic::transport::channel::Channel; use tonic::Request; @@ -12,17 +13,12 @@ use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use ExecutorsHandlerImpl as ExecutorsHandler; -#[cfg(test)] -pub use MockExecutorsHandlerImpl as ExecutorsHandler; - -pub struct ExecutorsHandlerImpl { +#[derive(Clone)] +pub struct ExecutorsHandler { client: RunnerClient, } -#[cfg_attr(test, mockall::automock)] -impl ExecutorsHandlerImpl { +impl ExecutorsHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -50,6 +46,31 @@ impl ExecutorsHandlerImpl { .await } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetExecutorRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self + .client + .clone() + .get_executor(Request::new(request)) + .await + { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get executor for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { code: indexer_config.code.clone(), @@ -97,4 +118,49 @@ impl ExecutorsHandlerImpl { Ok(()) } + + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let executor = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(executor) = executor { + if executor.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = executor.version, + "Stopping outdated executor" + ); + + self.stop(executor.executor_id).await?; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = config.get_registry_version(), + "Starting executor" + ); + + self.start(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(executor) = self.get(account_id, function_name).await? { + tracing::info!("Stopping executor"); + self.stop(executor.executor_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 2539e06a2..1200648f0 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -4,6 +4,7 @@ use anyhow::Context; use near_primitives::types::AccountId; use crate::indexer_config::IndexerConfig; +use crate::lifecycle::LifecycleState; use crate::redis::{KeyProvider, RedisClient}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -16,7 +17,7 @@ pub enum ProvisionedState { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct IndexerState { +pub struct OldIndexerState { pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, @@ -24,6 +25,15 @@ pub struct IndexerState { pub provisioned_state: ProvisionedState, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct IndexerState { + pub account_id: AccountId, + pub function_name: String, + pub block_stream_synced_at: Option, + pub enabled: bool, + pub lifecycle_state: LifecycleState, +} + impl KeyProvider for IndexerState { fn account_id(&self) -> String { self.account_id.to_string() @@ -49,13 +59,46 @@ impl IndexerStateManagerImpl { Self { redis_client } } + pub async fn migrate(&self) -> anyhow::Result<()> { + let raw_states = self.redis_client.list_indexer_states().await?; + + for raw_state in raw_states { + if let Ok(state) = serde_json::from_str::(&raw_state) { + tracing::info!( + "{}/{} already migrated, skipping", + state.account_id, + state.function_name + ); + continue; + } + + tracing::info!("Migrating {}", raw_state); + + let old_state: OldIndexerState = serde_json::from_str(&raw_state)?; + + let state = IndexerState { + account_id: old_state.account_id, + function_name: old_state.function_name, + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + lifecycle_state: LifecycleState::Running, + }; + + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; + } + + Ok(()) + } + fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState { IndexerState { account_id: indexer_config.account_id.clone(), function_name: indexer_config.function_name.clone(), block_stream_synced_at: None, enabled: true, - provisioned_state: ProvisionedState::Unprovisioned, + lifecycle_state: LifecycleState::default(), } } @@ -76,10 +119,12 @@ impl IndexerStateManagerImpl { } pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> { + tracing::info!("Deleting state"); + self.redis_client.delete_indexer_state(indexer_state).await } - async fn set_state( + pub async fn set_state( &self, indexer_config: &IndexerConfig, state: IndexerState, @@ -101,58 +146,6 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn set_deprovisioning( - &self, - indexer_state: &IndexerState, - task_id: String, - ) -> anyhow::Result<()> { - let mut state = indexer_state.clone(); - - state.provisioned_state = ProvisionedState::Deprovisioning { task_id }; - - self.redis_client - .set(state.get_state_key(), serde_json::to_string(&state)?) - .await?; - - Ok(()) - } - - pub async fn set_provisioning( - &self, - indexer_config: &IndexerConfig, - task_id: String, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id }; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - - pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioned; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_provisioning_failure( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Failed; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_enabled( &self, indexer_config: &IndexerConfig, @@ -194,7 +187,7 @@ mod tests { let mut mock_redis_client = RedisClient::default(); mock_redis_client .expect_list_indexer_states() - .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()])) + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string()])) .once(); mock_redis_client .expect_list_indexer_states() @@ -229,7 +222,7 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "lifecycle_state": "Initializing" }) .to_string(), )) }); @@ -237,7 +230,7 @@ mod tests { .expect_set_indexer_state::() .with( predicate::always(), - predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"lifecycle_state\":\"Initializing\"}".to_string()), ) .returning(|_, _| Ok(())) .once(); diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs new file mode 100644 index 000000000..70c9f7b38 --- /dev/null +++ b/coordinator/src/lifecycle.rs @@ -0,0 +1,343 @@ +use tracing::{info, warn}; + +use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::data_layer::DataLayerHandler; +use crate::handlers::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; +use crate::indexer_state::{IndexerState, IndexerStateManager}; +use crate::redis::{KeyProvider, RedisClient}; +use crate::registry::Registry; + +const LOOP_THROTTLE_MS: u64 = 1000; + +/// Represents the different lifecycle states of an Indexer +#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum LifecycleState { + /// Pre-requisite resources, i.e. Data Layer are being created. + /// + /// Transitions: + /// - `Running` on success + /// - `Repairing` on Data Layer provisioning failure + #[default] + Initializing, + /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure + /// they are running the latest version of the Indexer. + /// + /// Transitions: + /// - `Stopping` if suspended + /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a + /// retry + /// - `Running` on success + Running, + /// Indexer is being stopped, Block Stream and Executors are being stopped. + /// + /// Transitions: + /// - `Stopping` on failure, triggering a retry + /// - `Stopped` on success + Stopping, + /// Indexer is stopped, Block Stream and Executors are not running. + /// + /// Transitions: + /// - `Running` if unsuspended + Stopped, + /// Indexer is in a bad state, currently requires manual intervention, but should eventually + /// self heal. This is a dead-end state + /// + /// Transitions: + /// - `Repairing` continuously + Repairing, // TODO Add `error` to enable reparation + /// Indexer is being deleted, all resources are being cleaned up + /// + /// Transitions: + /// - `Deleting` on failure, triggering a retry + /// - `Deleted` on success + Deleting, + /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit + Deleted, +} + +pub struct LifecycleManager<'a> { + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, +} + +impl<'a> LifecycleManager<'a> { + pub fn new( + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, + ) -> Self { + Self { + initial_config, + block_streams_handler, + executors_handler, + data_layer_handler, + registry, + state_manager, + redis_client, + } + } + + #[tracing::instrument(name = "initializing", skip_all)] + async fn handle_initializing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if self + .data_layer_handler + .ensure_provisioned(config) + .await + .is_err() + { + return LifecycleState::Repairing; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "running", skip_all)] + async fn handle_running( + &self, + config: Option<&IndexerConfig>, + state: &mut IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if !state.enabled { + return LifecycleState::Stopping; + } + + if let Err(error) = self + .block_streams_handler + .synchronise_block_stream(config, state.block_stream_synced_at) + .await + { + warn!(?error, "Failed to synchronise block stream, retrying..."); + + return LifecycleState::Running; + } + + state.block_stream_synced_at = Some(config.get_registry_version()); + + if let Err(error) = self.executors_handler.synchronise_executor(config).await { + warn!(?error, "Failed to synchronise executor, retrying..."); + + return LifecycleState::Running; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "stopping", skip_all)] + async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if let Err(error) = self + .block_streams_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream, retrying..."); + return LifecycleState::Stopping; + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor, retrying..."); + return LifecycleState::Stopping; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "stopped", skip_all)] + async fn handle_stopped( + &self, + config: Option<&IndexerConfig>, + state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Transistion to `Running` on config update + + if state.enabled { + return LifecycleState::Running; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "repairing", skip_all)] + async fn handle_repairing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Add more robust error handling, for now just stop + LifecycleState::Repairing + } + + #[tracing::instrument(name = "deleting", skip_all)] + async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState { + if let Err(error) = self + .block_streams_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream"); + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor"); + } + + if self.state_manager.delete_state(state).await.is_err() { + // Retry + return LifecycleState::Deleting; + } + + info!("Clearing block stream"); + + if self + .redis_client + .del(state.get_redis_stream_key()) + .await + .is_err() + { + // Retry + return LifecycleState::Deleting; + } + + if self + .data_layer_handler + .ensure_deprovisioned(state.account_id.clone(), state.function_name.clone()) + .await + .is_err() + { + return LifecycleState::Deleted; + } + + LifecycleState::Deleted + } + + #[tracing::instrument( + name = "lifecycle_manager", + skip(self), + fields( + account_id = self.initial_config.account_id.as_str(), + function_name = self.initial_config.function_name.as_str() + ) + )] + pub async fn run(&self) { + let mut first_iteration = true; + + loop { + tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; + + let config = match self + .registry + .fetch_indexer( + &self.initial_config.account_id, + &self.initial_config.function_name, + ) + .await + { + Ok(config) => config, + Err(error) => { + warn!(?error, "Failed to fetch config"); + continue; + } + }; + + let mut state = match self.state_manager.get_state(&self.initial_config).await { + Ok(state) => state, + Err(error) => { + warn!(?error, "Failed to get state"); + continue; + } + }; + + if first_iteration { + info!("Initial lifecycle state: {:?}", state.lifecycle_state,); + first_iteration = false; + } + + let desired_lifecycle_state = match state.lifecycle_state { + LifecycleState::Initializing => { + self.handle_initializing(config.as_ref(), &state).await + } + LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await, + LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await, + LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await, + LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await, + LifecycleState::Deleting => self.handle_deleting(&state).await, + LifecycleState::Deleted => LifecycleState::Deleted, + }; + + if desired_lifecycle_state != state.lifecycle_state { + info!( + "Transitioning lifecycle state: {:?} -> {:?}", + state.lifecycle_state, desired_lifecycle_state, + ); + } + + if desired_lifecycle_state == LifecycleState::Deleted { + break; + } + + state.lifecycle_state = desired_lifecycle_state; + + loop { + match self + .state_manager + .set_state(&self.initial_config, state.clone()) + .await + { + Ok(_) => break, + Err(e) => { + warn!("Failed to set state: {:?}. Retrying...", e); + + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } + } + } + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6d55aa780..0ef229e30 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,27 +1,29 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use near_primitives::types::AccountId; +use tokio::task::JoinHandle; use tracing_subscriber::prelude::*; use crate::handlers::block_streams::BlockStreamsHandler; use crate::handlers::data_layer::DataLayerHandler; use crate::handlers::executors::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; +use crate::lifecycle::LifecycleManager; use crate::redis::RedisClient; use crate::registry::Registry; -use crate::synchroniser::Synchroniser; mod handlers; mod indexer_config; mod indexer_state; +mod lifecycle; mod redis; mod registry; mod server; -mod synchroniser; mod utils; -const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); +const LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); async fn sleep(duration: Duration) -> anyhow::Result<()> { tokio::time::sleep(duration).await; @@ -58,18 +60,11 @@ async fn main() -> anyhow::Result<()> { let registry = Arc::new(Registry::connect(registry_contract_id.clone(), &rpc_url)); let redis_client = RedisClient::connect(&redis_url).await?; - let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; + let block_streams_handler = + BlockStreamsHandler::connect(&block_streamer_url, redis_client.clone())?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; let data_layer_handler = DataLayerHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &indexer_state_manager, - &redis_client, - ); tokio::spawn({ let indexer_state_manager = indexer_state_manager.clone(); @@ -77,7 +72,65 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); + indexer_state_manager.migrate().await?; + + let mut lifecycle_tasks = HashMap::>::new(); + loop { - tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; + let indexer_registry = registry.fetch().await?; + + for config in indexer_registry + .iter() + .filter(|config| config.account_id == "vuso.near") + { + if lifecycle_tasks.contains_key(&config.get_full_name()) { + continue; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name.as_str(), + "Starting lifecycle manager" + ); + + let handle = tokio::spawn({ + let indexer_state_manager = indexer_state_manager.clone(); + let config = config.clone(); + let registry = registry.clone(); + let redis_client = redis_client.clone(); + let block_streams_handler = block_streams_handler.clone(); + let data_layer_handler = data_layer_handler.clone(); + let executors_handler = executors_handler.clone(); + + async move { + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &indexer_state_manager, + &redis_client, + ); + + lifecycle_manager.run().await + } + }); + + lifecycle_tasks.insert(config.get_full_name(), handle); + } + + let finished_tasks: Vec = lifecycle_tasks + .iter() + .filter_map(|(name, task)| task.is_finished().then_some(name.clone())) + .collect(); + + for indexer_name in finished_tasks { + tracing::info!(indexer_name, "Lifecycle has finished, removing..."); + + lifecycle_tasks.remove(&indexer_name); + } + + sleep(LOOP_THROTTLE_SECONDS).await?; } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index d3cb87397..de891ffa3 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -169,9 +169,9 @@ impl RegistryImpl { pub async fn fetch_indexer( &self, - account_id: AccountId, - function_name: String, - ) -> anyhow::Result { + account_id: &AccountId, + function_name: &str, + ) -> anyhow::Result> { let response = self .json_rpc_client .call(RpcQueryRequest { @@ -194,10 +194,10 @@ impl RegistryImpl { .context("Failed to fetch indexer")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let indexer: registry_types::IndexerConfig = - serde_json::from_slice(&call_result.result)?; - - return Ok(IndexerConfig { + let indexer = serde_json::from_slice::>( + &call_result.result, + )? + .map(|indexer| IndexerConfig { account_id: account_id.clone(), function_name: function_name.to_string(), code: indexer.code, @@ -207,6 +207,8 @@ impl RegistryImpl { updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }); + + return Ok(indexer); } anyhow::bail!("Invalid registry response") diff --git a/coordinator/src/server/indexer_manager_service.rs b/coordinator/src/server/indexer_manager_service.rs index 36a91ca31..809fd8970 100644 --- a/coordinator/src/server/indexer_manager_service.rs +++ b/coordinator/src/server/indexer_manager_service.rs @@ -42,9 +42,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, true) @@ -78,9 +79,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, false) diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index bf362bc5d..64a5ce073 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -427,956 +427,3 @@ impl<'a> Synchroniser<'a> { Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - use mockall::predicate::*; - use std::collections::HashMap; - - use crate::registry::IndexerRegistry; - - #[tokio::test] - async fn generates_sync_states() { - let existing_account_ids = vec![ - "account1.near".to_string(), - "account2.near".to_string(), - "account3.near".to_string(), - "account4.near".to_string(), - ]; - let new_account_ids = vec![ - "new_account1.near".to_string(), - "new_account2.near".to_string(), - ]; - let deleted_account_ids = vec![ - "deleted_account1.near".to_string(), - "deleted_account2.near".to_string(), - ]; - - let mut existing_indexer_configs: Vec = Vec::new(); - for (i, account_id) in existing_account_ids.iter().enumerate() { - for j in 1..=5 { - existing_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("existing_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut new_indexer_configs: Vec = Vec::new(); - for (i, account_id) in new_account_ids.iter().enumerate() { - for j in 1..=3 { - new_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("new_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut deleted_indexer_configs: Vec = Vec::new(); - for (i, account_id) in deleted_account_ids.iter().enumerate() { - for j in 1..=2 { - deleted_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("deleted_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut indexer_registry = IndexerRegistry::new(); - for indexer in existing_indexer_configs - .iter() - .chain(new_indexer_configs.iter()) - { - indexer_registry - .entry(indexer.account_id.clone()) - .or_default() - .insert(indexer.function_name.clone(), indexer.clone()); - } - - let mut block_streams_handler = BlockStreamsHandler::default(); - let block_streams: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| StreamInfo { - stream_id: format!("stream_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - }) - .collect(); - block_streams_handler - .expect_list() - .returning(move || Ok(block_streams.clone())); - - let mut executors_handler = ExecutorsHandler::default(); - let executors: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| ExecutorInfo { - executor_id: format!("executor_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - status: "running".to_string(), - }) - .collect(); - - executors_handler - .expect_list() - .returning(move || Ok(executors.clone())); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - let states: Vec = existing_indexer_configs - .iter() - .map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }) - .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - })) - .collect(); - state_manager - .expect_list() - .returning(move || Ok(states.clone())); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - let synchronisation_states = synchroniser - .generate_synchronisation_states() - .await - .unwrap(); - - let mut new_count = 0; - let mut existing_count = 0; - let mut deleted_count = 0; - - for state in &synchronisation_states { - match state { - SynchronisationState::New(_) => new_count += 1, - SynchronisationState::Existing(_, _, executor, block_stream) => { - assert!(executor.is_some(), "Executor should exist for the indexer"); - assert!( - block_stream.is_some(), - "Block stream should exist for the indexer" - ); - existing_count += 1; - } - SynchronisationState::Deleted(_, _, _) => { - deleted_count += 1; - } - } - } - - assert_eq!(new_count, 6); - assert_eq!(existing_count, 20); - assert_eq!(deleted_count, 4); - } - - mod new { - use super::*; - - #[tokio::test] - async fn triggers_data_layer_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_list().returning(|| Ok(vec![])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_list().returning(|| Ok(vec![])); - state_manager - .expect_set_provisioning() - .with(eq(config.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_provisioning_task() - .with(eq(config)) - .returning(|_| Ok("task_id".to_string())) - .once(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - } - - mod existing { - use super::*; - - #[tokio::test] - async fn waits_for_provisioning_to_complete() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let task_id = "task_id".to_string(); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: task_id.clone().to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioned() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq(task_id)) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_failed_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioning_failure() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Failed)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: config_clone.get_redis_stream_key(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - }]) - }); - block_streams_handler.expect_stop().never(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - status: "running".to_string(), - }]) - }); - executors_handler.expect_stop().never(); - executors_handler.expect_start().never(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn restarts_outdated() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - }]) - }); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - status: "running".to_string(), - }]) - }); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - executors_handler - .expect_start() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn treats_unsynced_blocks_streams_as_new() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: None, - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_stopped_and_outdated_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version() - 1), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn resumes_stopped_and_synced_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let last_published_block = 1; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream::() - .never(); - redis_client - .expect_get_last_published_block() - .with(eq(config.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(last_published_block + 1), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn reconfigures_block_stream() { - let config_with_latest = IndexerConfig { - start_block: StartBlock::Latest, - ..IndexerConfig::default() - }; - let height = 5; - let config_with_height = IndexerConfig { - start_block: StartBlock::Height(height), - ..IndexerConfig::default() - }; - let last_published_block = 1; - let config_with_continue = IndexerConfig { - start_block: StartBlock::Continue, - ..IndexerConfig::default() - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with( - eq(last_published_block + 1), - eq(config_with_continue.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with( - eq(config_with_latest.get_registry_version()), - eq(config_with_latest.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(height), eq(config_with_height.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_latest.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_height.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_get_last_published_block() - .with(eq(config_with_continue.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .reconfigure_block_stream(&config_with_latest) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_height) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_continue) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_disabled_indexers() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .never(); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - // Simulate second run, start/stop etc should not be called - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - } - - mod deleted { - use super::*; - - #[tokio::test] - async fn stops_block_stream_and_executor() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_delete_state().never(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Pending)); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - } - - #[tokio::test] - async fn cleans_indexer_resources() { - let config = IndexerConfig::default(); - let provisioned_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let deprovisioning_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_deprovisioning() - .with(eq(provisioned_state.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())); - state_manager - .expect_delete_state() - .with(eq(deprovisioning_state.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_deprovisioning_task() - .with( - eq(config.clone().account_id), - eq(config.clone().function_name), - ) - .returning(|_, _| Ok("task_id".to_string())); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_del::() - .with(eq(config.get_redis_stream_key())) - .returning(|_| Ok(())) - .once(); - - let registry = Registry::default(); - let block_streams_handler = BlockStreamsHandler::default(); - let executors_handler = ExecutorsHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&provisioned_state, None, None) - .await - .unwrap(); - synchroniser - .sync_deleted_indexer(&deprovisioning_state, None, None) - .await - .unwrap(); - } - } -} diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index bfbf9ca8d..f7a9c6dc1 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -32,6 +32,59 @@ describe('Runner gRPC Service', () => { genericIndexerConfig = new IndexerConfig(BASIC_REDIS_STREAM, BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_VERSION, BASIC_CODE, BASIC_SCHEMA, LogLevel.INFO); }); + it('get non existant executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + + await new Promise((resolve) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err) => { + expect(err).toEqual({ + code: grpc.status.NOT_FOUND, + message: `Executor for account ${BASIC_ACCOUNT_ID} and name ${BASIC_FUNCTION_NAME} does not exist` + }); + resolve(null); + }); + }); + }); + + it('gets an existing executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + const request = generateRequest(BASIC_REDIS_STREAM + '-A', BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_CODE, BASIC_SCHEMA, BASIC_VERSION); + + await new Promise((resolve, reject) => { + service.StartExecutor(request, (err) => { + if (err) reject(err); + resolve(null); + }); + }); + + await new Promise((resolve, reject) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err, response) => { + if (err) reject(err); + + expect(response).toEqual({ + executorId: BASIC_EXECUTOR_ID, + accountId: genericIndexerConfig.accountId, + functionName: genericIndexerConfig.functionName, + status: IndexerStatus.RUNNING, + version: '1' + }); + resolve(null); + }); + }); + }); + it('starts a executor with correct settings', () => { const service = getRunnerService(new Map(), genericStreamHandlerType); const mockCallback = jest.fn() as unknown as any; diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 8584d185b..0d135574f 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -8,6 +8,7 @@ import parentLogger from '../../../logger'; import { type RunnerHandlers } from '../../../generated/runner/Runner'; import { type StartExecutorResponse__Output, type StartExecutorResponse } from '../../../generated/runner/StartExecutorResponse'; import { type StartExecutorRequest__Output } from '../../../generated/runner/StartExecutorRequest'; +import { type GetExecutorRequest__Output } from '../../../generated/runner/GetExecutorRequest'; import { type StopExecutorRequest__Output } from '../../../generated/runner/StopExecutorRequest'; import { type StopExecutorResponse__Output, type StopExecutorResponse } from '../../../generated/runner/StopExecutorResponse'; import { type ListExecutorsRequest__Output } from '../../../generated/runner/ListExecutorsRequest'; @@ -19,6 +20,28 @@ export function getRunnerService ( StreamHandlerType: typeof StreamHandler = StreamHandler ): RunnerHandlers { const RunnerService: RunnerHandlers = { + GetExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { + const { accountId, functionName } = call.request; + + const executorEntry = Array.from(executors.entries()).find(([_id, executor]) => executor.indexerConfig.accountId === accountId && executor.indexerConfig.functionName === functionName); + + if (executorEntry) { + const [executorId, executor] = executorEntry; + callback(null, { + executorId, + accountId: executor.indexerConfig.accountId, + functionName: executor.indexerConfig.functionName, + version: executor.indexerConfig.version.toString(), + status: executor.executorContext.status + }); + } else { + const notFoundError = { + code: grpc.status.NOT_FOUND, + message: `Executor for account ${accountId} and name ${functionName} does not exist` + }; + callback(notFoundError, null); + } + }, StartExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { // Validate request const validationResult = validateStartExecutorRequest(call.request);