From 61036b2edc038a7815dc3105d1f55a35b8640324 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 18 Jul 2024 13:56:34 +1200 Subject: [PATCH] refactor: Spawn dedicated control loops per Indexer (#866) This PR introduces dedicated/self-contained control loops per Indexer, replacing the single/combined control loop. The motive for this ticket is described in #811, you can read more about it there. Overall, there is lots of clean up to be done, but I wanted to get this out the door as quick as possible as to not block the features required to build on top of this. I've discussed some of the major concerns below. ## `LifecycleManager` These dedicated control loops are managed by the `LifecycleManager` struct. This is a state machine which progresses the Indexer through different states depending on the context. The different states and their transitions are described on the `LifecycleState` enum: ```rust /// Represents the different lifecycle states of an Indexer #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub enum LifecycleState { /// Pre-requisite resources, i.e. Data Layer, are being created. /// /// Transitions: /// - `Running` on success /// - `Repairing` on Data Layer provisioning failure #[default] Initializing, /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure /// they are running the latest version of the Indexer. /// /// Transitions: /// - `Stopping` if suspended /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a /// retry /// - `Running` on success Running, /// Indexer is being stopped, Block Stream and Executors are being stopped. /// /// Transitions: /// - `Stopping` on failure, triggering a retry /// - `Stopped` on success Stopping, /// Indexer is stopped, Block Stream and Executors are not running. /// /// Transitions: /// - `Running` if unsuspended Stopped, /// Indexer is in a bad state, currently requires manual intervention, but should eventually /// self heal. This is a dead-end state /// /// Transitions: /// - `Repairing` continuously Repairing, // TODO Add `error` to enable reparation /// Indexer is being deleted, all resources are being cleaned up /// /// Transitions: /// - `Deleting` on failure, triggering a retry /// - `Deleted` on success Deleting, /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit Deleted, } ``` The logic of this `struct` is very light, triggering high-level actions required within each state, and then returning the next desired state. Most of the "doing" logic has has been encapsulated in the other related `structs` as discussed below. The lifecycle state is stored in Redis so that the Indexer can pickup where it left off. A migration has been added to accommodate this new field, which replaces the existing `provisioned_state` field. ## `Handler`s Previously, the "handlers", i.e. `BlockStreamsHandler`, were lightweight `structs` which wrapped the gRPC client/methods. In this PR, I've moved all "synchronisation" logic to these structs. So rather than calling the e.g. `data_layer_handler.start_provisioning_task()` method, we can call `ensure_provisioned()` which manages all related logic. I feel this has been encapsulation, and allows the `LifecycleManager` to be light. I've had to remove `automock`, so we don't have mocked versions for this right now. Cloning mocked versions is tricky, and requires manual mocking. Rather than bloat this PR, I've left this out. Eventually, I'll separate the "sync" logic from the "client" logic, so that the latter can be easily mocked, and the sync logic covered by unit tests. Additionally, I've added `get` methods for both Block Streamer and Executors RPC, as listing is no longer convenient given we are managing Indexers individually. The getters use `account_id` and `function_name` as opposed to IDs. I'm considering moving away from IDs as the only way to get them is via list, which isn't helpful. Right now it's somewhat of a transitory state. --- block-streamer/proto/block_streamer.proto | 11 + .../src/server/block_streamer_service.rs | 95 +- coordinator/src/handlers/block_streams.rs | 176 +++- coordinator/src/handlers/data_layer.rs | 92 +- coordinator/src/handlers/executors.rs | 84 +- coordinator/src/indexer_state.rs | 109 +- coordinator/src/lifecycle.rs | 343 +++++++ coordinator/src/main.rs | 79 +- coordinator/src/registry.rs | 16 +- .../src/server/indexer_manager_service.rs | 10 +- coordinator/src/synchroniser.rs | 953 ------------------ runner-client/proto/runner.proto | 9 + runner/protos/runner.proto | 9 + .../services/runner/runner-service.test.ts | 53 + .../server/services/runner/runner-service.ts | 23 + 15 files changed, 995 insertions(+), 1067 deletions(-) create mode 100644 coordinator/src/lifecycle.rs diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 5d1e0517a..92d3a2c90 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -12,6 +12,17 @@ service BlockStreamer { // Lists all current BlockStream processes rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); + + // Get info for an existing BlockStream process + rpc GetStream (GetStreamRequest) returns (StreamInfo); +} + +// Request message for getting a BlockStream +message GetStreamRequest { + // Account ID which the indexer is defined under + string account_id = 1; + // Name of the indexer + string function_name = 2; } // Request message for starting a BlockStream diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 94ccb22aa..88d426b9c 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -59,6 +59,38 @@ impl BlockStreamerService { #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + #[tracing::instrument(skip(self))] + async fn get_stream( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + + let stream_entry = lock.iter().find(|(_, block_stream)| { + block_stream.indexer_config.account_id == request.account_id + && block_stream.indexer_config.function_name == request.function_name + }); + + if let Some((stream_id, stream)) = stream_entry { + Ok(Response::new(StreamInfo { + stream_id: stream_id.to_string(), + account_id: stream.indexer_config.account_id.to_string(), + function_name: stream.indexer_config.function_name.to_string(), + version: stream.version, + })) + } else { + Err(Status::not_found(format!( + "Block Stream for account {} and name {} does not exist", + request.account_id, request.function_name + ))) + } + } + #[tracing::instrument(skip(self))] async fn start_stream( &self, @@ -171,7 +203,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, _request: Request, ) -> Result, Status> { - let lock = self.block_streams.lock().unwrap(); + let lock = self.block_streams.lock().map_err(|err| { + tracing::error!(?err, "Failed to acquire `block_streams` lock"); + tonic::Status::internal("Failed to acquire `block_streams` lock") + })?; + let block_streams: Vec = lock .values() .map(|block_stream| StreamInfo { @@ -234,6 +270,63 @@ mod tests { ) } + #[tokio::test] + async fn get_existing_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + block_streamer_service + .start_stream(Request::new(StartStreamRequest { + start_block_height: 0, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + version: 0, + redis_stream: "stream".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: 1, + })), + })) + .await + .unwrap(); + + let stream = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await + .unwrap(); + + assert_eq!( + stream.into_inner().stream_id, + "16210176318434468568".to_string() + ); + } + + #[tokio::test] + async fn get_non_existant_block_stream() { + let block_streamer_service = create_block_streamer_service(); + + { + let lock = block_streamer_service.get_block_streams_lock().unwrap(); + assert_eq!(lock.len(), 0); + } + + let stream_response = block_streamer_service + .get_stream(Request::new(GetStreamRequest { + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + })) + .await; + + assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound); + } + #[tokio::test] async fn starts_a_block_stream() { let block_streamer_service = create_block_streamer_service(); diff --git a/coordinator/src/handlers/block_streams.rs b/coordinator/src/handlers/block_streams.rs index f88a412d5..bc34c000b 100644 --- a/coordinator/src/handlers/block_streams.rs +++ b/coordinator/src/handlers/block_streams.rs @@ -5,34 +5,36 @@ pub use block_streamer::StreamInfo; use anyhow::Context; use block_streamer::block_streamer_client::BlockStreamerClient; use block_streamer::{ - start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest, - StartStreamRequest, Status, StopStreamRequest, + start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest, + ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest, }; +use near_primitives::types::AccountId; +use registry_types::StartBlock; use tonic::transport::channel::Channel; use tonic::Request; use crate::indexer_config::IndexerConfig; -use crate::redis::KeyProvider; +use crate::redis::{KeyProvider, RedisClient}; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use BlockStreamsHandlerImpl as BlockStreamsHandler; -#[cfg(test)] -pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler; - -pub struct BlockStreamsHandlerImpl { +#[derive(Clone)] +pub struct BlockStreamsHandler { client: BlockStreamerClient, + redis_client: RedisClient, } #[cfg_attr(test, mockall::automock)] -impl BlockStreamsHandlerImpl { - pub fn connect(block_streamer_url: &str) -> anyhow::Result { +impl BlockStreamsHandler { + pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result { let channel = Channel::from_shared(block_streamer_url.to_string()) .context("Block Streamer URL is invalid")? .connect_lazy(); let client = BlockStreamerClient::new(channel); - Ok(Self { client }) + Ok(Self { + client, + redis_client, + }) } pub async fn list(&self) -> anyhow::Result> { @@ -79,6 +81,26 @@ impl BlockStreamsHandlerImpl { .into() } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetStreamRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self.client.clone().get_stream(Request::new(request)).await { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get stream for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start( &self, start_block_height: u64, @@ -139,4 +161,134 @@ impl BlockStreamsHandlerImpl { Ok(()) } + + async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + if matches!( + config.start_block, + StartBlock::Latest | StartBlock::Height(..) + ) { + self.redis_client.clear_block_stream(config).await?; + } + + let height = match config.start_block { + StartBlock::Latest => config.get_registry_version(), + StartBlock::Height(height) => height, + StartBlock::Continue => self.get_continuation_block_height(config).await?, + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await?; + + Ok(()) + } + + async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = match config.start_block { + StartBlock::Height(height) => height, + StartBlock::Latest => config.get_registry_version(), + StartBlock::Continue => { + tracing::warn!( + "Attempted to start new Block Stream with CONTINUE, using LATEST instead" + ); + config.get_registry_version() + } + }; + + tracing::info!( + start_block = ?config.start_block, + height, + "Starting block stream" + ); + + self.start(height, config).await + } + + async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result { + let height = self + .redis_client + .get_last_published_block(config) + .await? + .map(|height| height + 1) + .unwrap_or_else(|| { + tracing::warn!( + "Failed to get continuation block height, using registry version instead" + ); + + config.get_registry_version() + }); + + Ok(height) + } + + async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let height = self.get_continuation_block_height(config).await?; + + tracing::info!(height, "Resuming block stream"); + + self.start(height, config).await?; + + Ok(()) + } + + pub async fn synchronise_block_stream( + &self, + config: &IndexerConfig, + previous_sync_version: Option, + ) -> anyhow::Result<()> { + let block_stream = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(block_stream) = block_stream { + if block_stream.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + previous_version = block_stream.version, + "Stopping outdated block stream" + ); + + self.stop(block_stream.stream_id.clone()).await?; + + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.is_none() { + self.start_new_block_stream(config).await?; + + return Ok(()); + } + + if previous_sync_version.unwrap() != config.get_registry_version() { + self.reconfigure_block_stream(config).await?; + + return Ok(()); + } + + self.resume_block_stream(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(block_stream) = self.get(account_id, function_name).await? { + tracing::info!("Stopping block stream"); + + self.stop(block_stream.stream_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 68537b3c7..3e2df54dc 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -8,23 +8,18 @@ use anyhow::Context; use runner::data_layer::data_layer_client::DataLayerClient; use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest}; use tonic::transport::channel::Channel; -use tonic::Request; +use tonic::{Request, Status}; use crate::indexer_config::IndexerConfig; -#[cfg(not(test))] -pub use DataLayerHandlerImpl as DataLayerHandler; -#[cfg(test)] -pub use MockDataLayerHandlerImpl as DataLayerHandler; - type TaskId = String; -pub struct DataLayerHandlerImpl { +#[derive(Clone)] +pub struct DataLayerHandler { client: DataLayerClient, } -#[cfg_attr(test, mockall::automock)] -impl DataLayerHandlerImpl { +impl DataLayerHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -37,7 +32,7 @@ impl DataLayerHandlerImpl { pub async fn start_provisioning_task( &self, indexer_config: &IndexerConfig, - ) -> anyhow::Result { + ) -> Result { let request = ProvisionRequest { account_id: indexer_config.account_id.to_string(), function_name: indexer_config.function_name.clone(), @@ -98,4 +93,81 @@ impl DataLayerHandlerImpl { Ok(status) } + + pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); + + let start_task_result = self.start_provisioning_task(indexer_config).await; + + if let Err(error) = start_task_result { + // Already provisioned + if error.code() == tonic::Code::FailedPrecondition { + return Ok(()); + } + + return Err(error.into()); + } + + let task_id = start_task_result.unwrap(); + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?indexer_config.account_id, + ?indexer_config.function_name, + "Still waiting for provisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } + + pub async fn ensure_deprovisioned( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + tracing::info!(?account_id, ?function_name, "Deprovisioning data layer"); + + let task_id = self + .start_deprovisioning_task(account_id.clone(), function_name.clone()) + .await?; + + let mut iterations = 0; + let delay_seconds = 1; + + loop { + if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + + iterations += 1; + + if iterations * delay_seconds % 60 == 0 { + tracing::warn!( + ?account_id, + ?function_name, + "Still waiting for deprovisioning to complete after {} seconds", + iterations * delay_seconds + ); + } + } + + Ok(()) + } } diff --git a/coordinator/src/handlers/executors.rs b/coordinator/src/handlers/executors.rs index 686164048..01c9cc4be 100644 --- a/coordinator/src/handlers/executors.rs +++ b/coordinator/src/handlers/executors.rs @@ -1,10 +1,11 @@ #![cfg_attr(test, allow(dead_code))] +use near_primitives::types::AccountId; pub use runner::ExecutorInfo; use anyhow::Context; use runner::runner_client::RunnerClient; -use runner::{ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; +use runner::{GetExecutorRequest, ListExecutorsRequest, StartExecutorRequest, StopExecutorRequest}; use tonic::transport::channel::Channel; use tonic::Request; @@ -12,17 +13,12 @@ use crate::indexer_config::IndexerConfig; use crate::redis::KeyProvider; use crate::utils::exponential_retry; -#[cfg(not(test))] -pub use ExecutorsHandlerImpl as ExecutorsHandler; -#[cfg(test)] -pub use MockExecutorsHandlerImpl as ExecutorsHandler; - -pub struct ExecutorsHandlerImpl { +#[derive(Clone)] +pub struct ExecutorsHandler { client: RunnerClient, } -#[cfg_attr(test, mockall::automock)] -impl ExecutorsHandlerImpl { +impl ExecutorsHandler { pub fn connect(runner_url: &str) -> anyhow::Result { let channel = Channel::from_shared(runner_url.to_string()) .context("Runner URL is invalid")? @@ -50,6 +46,31 @@ impl ExecutorsHandlerImpl { .await } + pub async fn get( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result> { + let request = GetExecutorRequest { + account_id: account_id.to_string(), + function_name: function_name.clone(), + }; + + match self + .client + .clone() + .get_executor(Request::new(request)) + .await + { + Ok(response) => Ok(Some(response.into_inner())), + Err(status) if status.code() == tonic::Code::NotFound => Ok(None), + Err(err) => Err(err).context(format!( + "Failed to get executor for account {} and name {}", + account_id, function_name + )), + } + } + pub async fn start(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { let request = StartExecutorRequest { code: indexer_config.code.clone(), @@ -97,4 +118,49 @@ impl ExecutorsHandlerImpl { Ok(()) } + + pub async fn synchronise_executor(&self, config: &IndexerConfig) -> anyhow::Result<()> { + let executor = self + .get(config.account_id.clone(), config.function_name.clone()) + .await?; + + if let Some(executor) = executor { + if executor.version == config.get_registry_version() { + return Ok(()); + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = executor.version, + "Stopping outdated executor" + ); + + self.stop(executor.executor_id).await?; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name, + version = config.get_registry_version(), + "Starting executor" + ); + + self.start(config).await?; + + Ok(()) + } + + pub async fn stop_if_needed( + &self, + account_id: AccountId, + function_name: String, + ) -> anyhow::Result<()> { + if let Some(executor) = self.get(account_id, function_name).await? { + tracing::info!("Stopping executor"); + self.stop(executor.executor_id).await?; + } + + Ok(()) + } } diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 2539e06a2..1200648f0 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -4,6 +4,7 @@ use anyhow::Context; use near_primitives::types::AccountId; use crate::indexer_config::IndexerConfig; +use crate::lifecycle::LifecycleState; use crate::redis::{KeyProvider, RedisClient}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] @@ -16,7 +17,7 @@ pub enum ProvisionedState { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct IndexerState { +pub struct OldIndexerState { pub account_id: AccountId, pub function_name: String, pub block_stream_synced_at: Option, @@ -24,6 +25,15 @@ pub struct IndexerState { pub provisioned_state: ProvisionedState, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct IndexerState { + pub account_id: AccountId, + pub function_name: String, + pub block_stream_synced_at: Option, + pub enabled: bool, + pub lifecycle_state: LifecycleState, +} + impl KeyProvider for IndexerState { fn account_id(&self) -> String { self.account_id.to_string() @@ -49,13 +59,46 @@ impl IndexerStateManagerImpl { Self { redis_client } } + pub async fn migrate(&self) -> anyhow::Result<()> { + let raw_states = self.redis_client.list_indexer_states().await?; + + for raw_state in raw_states { + if let Ok(state) = serde_json::from_str::(&raw_state) { + tracing::info!( + "{}/{} already migrated, skipping", + state.account_id, + state.function_name + ); + continue; + } + + tracing::info!("Migrating {}", raw_state); + + let old_state: OldIndexerState = serde_json::from_str(&raw_state)?; + + let state = IndexerState { + account_id: old_state.account_id, + function_name: old_state.function_name, + block_stream_synced_at: old_state.block_stream_synced_at, + enabled: old_state.enabled, + lifecycle_state: LifecycleState::Running, + }; + + self.redis_client + .set(state.get_state_key(), serde_json::to_string(&state)?) + .await?; + } + + Ok(()) + } + fn get_default_state(&self, indexer_config: &IndexerConfig) -> IndexerState { IndexerState { account_id: indexer_config.account_id.clone(), function_name: indexer_config.function_name.clone(), block_stream_synced_at: None, enabled: true, - provisioned_state: ProvisionedState::Unprovisioned, + lifecycle_state: LifecycleState::default(), } } @@ -76,10 +119,12 @@ impl IndexerStateManagerImpl { } pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> { + tracing::info!("Deleting state"); + self.redis_client.delete_indexer_state(indexer_state).await } - async fn set_state( + pub async fn set_state( &self, indexer_config: &IndexerConfig, state: IndexerState, @@ -101,58 +146,6 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn set_deprovisioning( - &self, - indexer_state: &IndexerState, - task_id: String, - ) -> anyhow::Result<()> { - let mut state = indexer_state.clone(); - - state.provisioned_state = ProvisionedState::Deprovisioning { task_id }; - - self.redis_client - .set(state.get_state_key(), serde_json::to_string(&state)?) - .await?; - - Ok(()) - } - - pub async fn set_provisioning( - &self, - indexer_config: &IndexerConfig, - task_id: String, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id }; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - - pub async fn set_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Provisioned; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_provisioning_failure( - &self, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let mut indexer_state = self.get_state(indexer_config).await?; - - indexer_state.provisioned_state = ProvisionedState::Failed; - - self.set_state(indexer_config, indexer_state).await?; - - Ok(()) - } - pub async fn set_enabled( &self, indexer_config: &IndexerConfig, @@ -194,7 +187,7 @@ mod tests { let mut mock_redis_client = RedisClient::default(); mock_redis_client .expect_list_indexer_states() - .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "provisioned_state": "Provisioned" }).to_string()])) + .returning(|| Ok(vec![serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string()])) .once(); mock_redis_client .expect_list_indexer_states() @@ -229,7 +222,7 @@ mod tests { .with(predicate::eq(indexer_config.clone())) .returning(|_| { Ok(Some( - serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "provisioned_state": "Provisioned" }) + serde_json::json!({ "account_id": "morgs.near", "function_name": "test", "block_stream_synced_at": 123, "enabled": true, "lifecycle_state": "Initializing" }) .to_string(), )) }); @@ -237,7 +230,7 @@ mod tests { .expect_set_indexer_state::() .with( predicate::always(), - predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"provisioned_state\":\"Provisioned\"}".to_string()), + predicate::eq("{\"account_id\":\"morgs.near\",\"function_name\":\"test\",\"block_stream_synced_at\":123,\"enabled\":false,\"lifecycle_state\":\"Initializing\"}".to_string()), ) .returning(|_, _| Ok(())) .once(); diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs new file mode 100644 index 000000000..70c9f7b38 --- /dev/null +++ b/coordinator/src/lifecycle.rs @@ -0,0 +1,343 @@ +use tracing::{info, warn}; + +use crate::handlers::block_streams::BlockStreamsHandler; +use crate::handlers::data_layer::DataLayerHandler; +use crate::handlers::executors::ExecutorsHandler; +use crate::indexer_config::IndexerConfig; +use crate::indexer_state::{IndexerState, IndexerStateManager}; +use crate::redis::{KeyProvider, RedisClient}; +use crate::registry::Registry; + +const LOOP_THROTTLE_MS: u64 = 1000; + +/// Represents the different lifecycle states of an Indexer +#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum LifecycleState { + /// Pre-requisite resources, i.e. Data Layer are being created. + /// + /// Transitions: + /// - `Running` on success + /// - `Repairing` on Data Layer provisioning failure + #[default] + Initializing, + /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure + /// they are running the latest version of the Indexer. + /// + /// Transitions: + /// - `Stopping` if suspended + /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a + /// retry + /// - `Running` on success + Running, + /// Indexer is being stopped, Block Stream and Executors are being stopped. + /// + /// Transitions: + /// - `Stopping` on failure, triggering a retry + /// - `Stopped` on success + Stopping, + /// Indexer is stopped, Block Stream and Executors are not running. + /// + /// Transitions: + /// - `Running` if unsuspended + Stopped, + /// Indexer is in a bad state, currently requires manual intervention, but should eventually + /// self heal. This is a dead-end state + /// + /// Transitions: + /// - `Repairing` continuously + Repairing, // TODO Add `error` to enable reparation + /// Indexer is being deleted, all resources are being cleaned up + /// + /// Transitions: + /// - `Deleting` on failure, triggering a retry + /// - `Deleted` on success + Deleting, + /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit + Deleted, +} + +pub struct LifecycleManager<'a> { + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, +} + +impl<'a> LifecycleManager<'a> { + pub fn new( + initial_config: IndexerConfig, + block_streams_handler: &'a BlockStreamsHandler, + executors_handler: &'a ExecutorsHandler, + data_layer_handler: &'a DataLayerHandler, + registry: &'a Registry, + state_manager: &'a IndexerStateManager, + redis_client: &'a RedisClient, + ) -> Self { + Self { + initial_config, + block_streams_handler, + executors_handler, + data_layer_handler, + registry, + state_manager, + redis_client, + } + } + + #[tracing::instrument(name = "initializing", skip_all)] + async fn handle_initializing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if self + .data_layer_handler + .ensure_provisioned(config) + .await + .is_err() + { + return LifecycleState::Repairing; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "running", skip_all)] + async fn handle_running( + &self, + config: Option<&IndexerConfig>, + state: &mut IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if !state.enabled { + return LifecycleState::Stopping; + } + + if let Err(error) = self + .block_streams_handler + .synchronise_block_stream(config, state.block_stream_synced_at) + .await + { + warn!(?error, "Failed to synchronise block stream, retrying..."); + + return LifecycleState::Running; + } + + state.block_stream_synced_at = Some(config.get_registry_version()); + + if let Err(error) = self.executors_handler.synchronise_executor(config).await { + warn!(?error, "Failed to synchronise executor, retrying..."); + + return LifecycleState::Running; + } + + LifecycleState::Running + } + + #[tracing::instrument(name = "stopping", skip_all)] + async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + let config = config.unwrap(); + + if let Err(error) = self + .block_streams_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream, retrying..."); + return LifecycleState::Stopping; + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(config.account_id.clone(), config.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor, retrying..."); + return LifecycleState::Stopping; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "stopped", skip_all)] + async fn handle_stopped( + &self, + config: Option<&IndexerConfig>, + state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Transistion to `Running` on config update + + if state.enabled { + return LifecycleState::Running; + } + + LifecycleState::Stopped + } + + #[tracing::instrument(name = "repairing", skip_all)] + async fn handle_repairing( + &self, + config: Option<&IndexerConfig>, + _state: &IndexerState, + ) -> LifecycleState { + if config.is_none() { + return LifecycleState::Deleting; + } + + // TODO Add more robust error handling, for now just stop + LifecycleState::Repairing + } + + #[tracing::instrument(name = "deleting", skip_all)] + async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState { + if let Err(error) = self + .block_streams_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop block stream"); + } + + if let Err(error) = self + .executors_handler + .stop_if_needed(state.account_id.clone(), state.function_name.clone()) + .await + { + warn!(?error, "Failed to stop executor"); + } + + if self.state_manager.delete_state(state).await.is_err() { + // Retry + return LifecycleState::Deleting; + } + + info!("Clearing block stream"); + + if self + .redis_client + .del(state.get_redis_stream_key()) + .await + .is_err() + { + // Retry + return LifecycleState::Deleting; + } + + if self + .data_layer_handler + .ensure_deprovisioned(state.account_id.clone(), state.function_name.clone()) + .await + .is_err() + { + return LifecycleState::Deleted; + } + + LifecycleState::Deleted + } + + #[tracing::instrument( + name = "lifecycle_manager", + skip(self), + fields( + account_id = self.initial_config.account_id.as_str(), + function_name = self.initial_config.function_name.as_str() + ) + )] + pub async fn run(&self) { + let mut first_iteration = true; + + loop { + tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; + + let config = match self + .registry + .fetch_indexer( + &self.initial_config.account_id, + &self.initial_config.function_name, + ) + .await + { + Ok(config) => config, + Err(error) => { + warn!(?error, "Failed to fetch config"); + continue; + } + }; + + let mut state = match self.state_manager.get_state(&self.initial_config).await { + Ok(state) => state, + Err(error) => { + warn!(?error, "Failed to get state"); + continue; + } + }; + + if first_iteration { + info!("Initial lifecycle state: {:?}", state.lifecycle_state,); + first_iteration = false; + } + + let desired_lifecycle_state = match state.lifecycle_state { + LifecycleState::Initializing => { + self.handle_initializing(config.as_ref(), &state).await + } + LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await, + LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await, + LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await, + LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await, + LifecycleState::Deleting => self.handle_deleting(&state).await, + LifecycleState::Deleted => LifecycleState::Deleted, + }; + + if desired_lifecycle_state != state.lifecycle_state { + info!( + "Transitioning lifecycle state: {:?} -> {:?}", + state.lifecycle_state, desired_lifecycle_state, + ); + } + + if desired_lifecycle_state == LifecycleState::Deleted { + break; + } + + state.lifecycle_state = desired_lifecycle_state; + + loop { + match self + .state_manager + .set_state(&self.initial_config, state.clone()) + .await + { + Ok(_) => break, + Err(e) => { + warn!("Failed to set state: {:?}. Retrying...", e); + + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } + } + } + } + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6d55aa780..0ef229e30 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,27 +1,29 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use near_primitives::types::AccountId; +use tokio::task::JoinHandle; use tracing_subscriber::prelude::*; use crate::handlers::block_streams::BlockStreamsHandler; use crate::handlers::data_layer::DataLayerHandler; use crate::handlers::executors::ExecutorsHandler; use crate::indexer_state::IndexerStateManager; +use crate::lifecycle::LifecycleManager; use crate::redis::RedisClient; use crate::registry::Registry; -use crate::synchroniser::Synchroniser; mod handlers; mod indexer_config; mod indexer_state; +mod lifecycle; mod redis; mod registry; mod server; -mod synchroniser; mod utils; -const CONTROL_LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); +const LOOP_THROTTLE_SECONDS: Duration = Duration::from_secs(1); async fn sleep(duration: Duration) -> anyhow::Result<()> { tokio::time::sleep(duration).await; @@ -58,18 +60,11 @@ async fn main() -> anyhow::Result<()> { let registry = Arc::new(Registry::connect(registry_contract_id.clone(), &rpc_url)); let redis_client = RedisClient::connect(&redis_url).await?; - let block_streams_handler = BlockStreamsHandler::connect(&block_streamer_url)?; + let block_streams_handler = + BlockStreamsHandler::connect(&block_streamer_url, redis_client.clone())?; let executors_handler = ExecutorsHandler::connect(&runner_url)?; let data_layer_handler = DataLayerHandler::connect(&runner_url)?; let indexer_state_manager = Arc::new(IndexerStateManager::new(redis_client.clone())); - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &indexer_state_manager, - &redis_client, - ); tokio::spawn({ let indexer_state_manager = indexer_state_manager.clone(); @@ -77,7 +72,65 @@ async fn main() -> anyhow::Result<()> { async move { server::init(grpc_port, indexer_state_manager, registry).await } }); + indexer_state_manager.migrate().await?; + + let mut lifecycle_tasks = HashMap::>::new(); + loop { - tokio::try_join!(synchroniser.sync(), sleep(CONTROL_LOOP_THROTTLE_SECONDS))?; + let indexer_registry = registry.fetch().await?; + + for config in indexer_registry + .iter() + .filter(|config| config.account_id == "vuso.near") + { + if lifecycle_tasks.contains_key(&config.get_full_name()) { + continue; + } + + tracing::info!( + account_id = config.account_id.as_str(), + function_name = config.function_name.as_str(), + "Starting lifecycle manager" + ); + + let handle = tokio::spawn({ + let indexer_state_manager = indexer_state_manager.clone(); + let config = config.clone(); + let registry = registry.clone(); + let redis_client = redis_client.clone(); + let block_streams_handler = block_streams_handler.clone(); + let data_layer_handler = data_layer_handler.clone(); + let executors_handler = executors_handler.clone(); + + async move { + let lifecycle_manager = LifecycleManager::new( + config, + &block_streams_handler, + &executors_handler, + &data_layer_handler, + ®istry, + &indexer_state_manager, + &redis_client, + ); + + lifecycle_manager.run().await + } + }); + + lifecycle_tasks.insert(config.get_full_name(), handle); + } + + let finished_tasks: Vec = lifecycle_tasks + .iter() + .filter_map(|(name, task)| task.is_finished().then_some(name.clone())) + .collect(); + + for indexer_name in finished_tasks { + tracing::info!(indexer_name, "Lifecycle has finished, removing..."); + + lifecycle_tasks.remove(&indexer_name); + } + + sleep(LOOP_THROTTLE_SECONDS).await?; } } diff --git a/coordinator/src/registry.rs b/coordinator/src/registry.rs index d3cb87397..de891ffa3 100644 --- a/coordinator/src/registry.rs +++ b/coordinator/src/registry.rs @@ -169,9 +169,9 @@ impl RegistryImpl { pub async fn fetch_indexer( &self, - account_id: AccountId, - function_name: String, - ) -> anyhow::Result { + account_id: &AccountId, + function_name: &str, + ) -> anyhow::Result> { let response = self .json_rpc_client .call(RpcQueryRequest { @@ -194,10 +194,10 @@ impl RegistryImpl { .context("Failed to fetch indexer")?; if let QueryResponseKind::CallResult(call_result) = response.kind { - let indexer: registry_types::IndexerConfig = - serde_json::from_slice(&call_result.result)?; - - return Ok(IndexerConfig { + let indexer = serde_json::from_slice::>( + &call_result.result, + )? + .map(|indexer| IndexerConfig { account_id: account_id.clone(), function_name: function_name.to_string(), code: indexer.code, @@ -207,6 +207,8 @@ impl RegistryImpl { updated_at_block_height: indexer.updated_at_block_height, created_at_block_height: indexer.created_at_block_height, }); + + return Ok(indexer); } anyhow::bail!("Invalid registry response") diff --git a/coordinator/src/server/indexer_manager_service.rs b/coordinator/src/server/indexer_manager_service.rs index 36a91ca31..809fd8970 100644 --- a/coordinator/src/server/indexer_manager_service.rs +++ b/coordinator/src/server/indexer_manager_service.rs @@ -42,9 +42,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, true) @@ -78,9 +79,10 @@ impl indexer_manager::indexer_manager_server::IndexerManager for IndexerManagerS let indexer_config = self .registry - .fetch_indexer(account_id, request.function_name) + .fetch_indexer(&account_id, &request.function_name) .await - .map_err(|_| Status::not_found("Indexer not found"))?; + .map_err(|_| Status::internal("Failed to fetch indexer"))? + .ok_or(Status::not_found("Indexer not found"))?; self.indexer_state_manager .set_enabled(&indexer_config, false) diff --git a/coordinator/src/synchroniser.rs b/coordinator/src/synchroniser.rs index bf362bc5d..64a5ce073 100644 --- a/coordinator/src/synchroniser.rs +++ b/coordinator/src/synchroniser.rs @@ -427,956 +427,3 @@ impl<'a> Synchroniser<'a> { Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - use mockall::predicate::*; - use std::collections::HashMap; - - use crate::registry::IndexerRegistry; - - #[tokio::test] - async fn generates_sync_states() { - let existing_account_ids = vec![ - "account1.near".to_string(), - "account2.near".to_string(), - "account3.near".to_string(), - "account4.near".to_string(), - ]; - let new_account_ids = vec![ - "new_account1.near".to_string(), - "new_account2.near".to_string(), - ]; - let deleted_account_ids = vec![ - "deleted_account1.near".to_string(), - "deleted_account2.near".to_string(), - ]; - - let mut existing_indexer_configs: Vec = Vec::new(); - for (i, account_id) in existing_account_ids.iter().enumerate() { - for j in 1..=5 { - existing_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("existing_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut new_indexer_configs: Vec = Vec::new(); - for (i, account_id) in new_account_ids.iter().enumerate() { - for j in 1..=3 { - new_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("new_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut deleted_indexer_configs: Vec = Vec::new(); - for (i, account_id) in deleted_account_ids.iter().enumerate() { - for j in 1..=2 { - deleted_indexer_configs.push(IndexerConfig { - account_id: account_id.parse().unwrap(), - function_name: format!("deleted_indexer{}_{}", i + 1, j), - ..Default::default() - }); - } - } - - let mut indexer_registry = IndexerRegistry::new(); - for indexer in existing_indexer_configs - .iter() - .chain(new_indexer_configs.iter()) - { - indexer_registry - .entry(indexer.account_id.clone()) - .or_default() - .insert(indexer.function_name.clone(), indexer.clone()); - } - - let mut block_streams_handler = BlockStreamsHandler::default(); - let block_streams: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| StreamInfo { - stream_id: format!("stream_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - }) - .collect(); - block_streams_handler - .expect_list() - .returning(move || Ok(block_streams.clone())); - - let mut executors_handler = ExecutorsHandler::default(); - let executors: Vec = existing_indexer_configs - .iter() - // generate some "randomness" - .rev() - .enumerate() - .map(|(i, indexer)| ExecutorInfo { - executor_id: format!("executor_id{}", i + 1), - account_id: indexer.account_id.to_string(), - function_name: indexer.function_name.clone(), - version: indexer.get_registry_version(), - status: "running".to_string(), - }) - .collect(); - - executors_handler - .expect_list() - .returning(move || Ok(executors.clone())); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - let states: Vec = existing_indexer_configs - .iter() - .map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }) - .chain(deleted_indexer_configs.iter().map(|indexer| IndexerState { - account_id: indexer.account_id.clone(), - function_name: indexer.function_name.clone(), - block_stream_synced_at: Some(indexer.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - })) - .collect(); - state_manager - .expect_list() - .returning(move || Ok(states.clone())); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - let synchronisation_states = synchroniser - .generate_synchronisation_states() - .await - .unwrap(); - - let mut new_count = 0; - let mut existing_count = 0; - let mut deleted_count = 0; - - for state in &synchronisation_states { - match state { - SynchronisationState::New(_) => new_count += 1, - SynchronisationState::Existing(_, _, executor, block_stream) => { - assert!(executor.is_some(), "Executor should exist for the indexer"); - assert!( - block_stream.is_some(), - "Block stream should exist for the indexer" - ); - existing_count += 1; - } - SynchronisationState::Deleted(_, _, _) => { - deleted_count += 1; - } - } - } - - assert_eq!(new_count, 6); - assert_eq!(existing_count, 20); - assert_eq!(deleted_count, 4); - } - - mod new { - use super::*; - - #[tokio::test] - async fn triggers_data_layer_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_list().returning(|| Ok(vec![])); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_list().returning(|| Ok(vec![])); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_list().returning(|| Ok(vec![])); - state_manager - .expect_set_provisioning() - .with(eq(config.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_provisioning_task() - .with(eq(config)) - .returning(|_| Ok("task_id".to_string())) - .once(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - } - - mod existing { - use super::*; - - #[tokio::test] - async fn waits_for_provisioning_to_complete() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let task_id = "task_id".to_string(); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: task_id.clone().to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioned() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq(task_id)) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_failed_provisioning() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_provisioning_failure() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Failed)); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler.expect_start().never(); - - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn ignores_synced() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: config_clone.get_redis_stream_key(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - }]) - }); - block_streams_handler.expect_stop().never(); - block_streams_handler.expect_start().never(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version(), - status: "running".to_string(), - }]) - }); - executors_handler.expect_stop().never(); - executors_handler.expect_start().never(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn restarts_outdated() { - let config = IndexerConfig::default(); - - let indexer_registry = IndexerRegistry::from(&[( - config.account_id.clone(), - HashMap::from([(config.function_name.clone(), config.clone())]), - )]); - - let mut block_streams_handler = BlockStreamsHandler::default(); - let config_clone = config.clone(); - block_streams_handler.expect_list().returning(move || { - Ok(vec![StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - }]) - }); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - let config_clone = config.clone(); - executors_handler.expect_list().returning(move || { - Ok(vec![ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config_clone.account_id.to_string(), - function_name: config_clone.function_name.clone(), - version: config_clone.get_registry_version() + 1, - status: "running".to_string(), - }]) - }); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - executors_handler - .expect_start() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut registry = Registry::default(); - registry - .expect_fetch() - .returning(move || Ok(indexer_registry.clone())); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - state_manager.expect_list().returning(move || { - Ok(vec![IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }]) - }); - - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser.sync().await.unwrap(); - } - - #[tokio::test] - async fn treats_unsynced_blocks_streams_as_new() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: None, - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let redis_client = RedisClient::default(); - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn restarts_stopped_and_outdated_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version() - 1), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(100), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn resumes_stopped_and_synced_block_stream() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: true, - provisioned_state: ProvisionedState::Provisioned, - }; - - let last_published_block = 1; - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream::() - .never(); - redis_client - .expect_get_last_published_block() - .with(eq(config.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with(eq(last_published_block + 1), eq(config.clone())) - .returning(|_, _| Ok(())) - .once(); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_block_stream(&config, &state, None) - .await - .unwrap(); - } - - #[tokio::test] - async fn reconfigures_block_stream() { - let config_with_latest = IndexerConfig { - start_block: StartBlock::Latest, - ..IndexerConfig::default() - }; - let height = 5; - let config_with_height = IndexerConfig { - start_block: StartBlock::Height(height), - ..IndexerConfig::default() - }; - let last_published_block = 1; - let config_with_continue = IndexerConfig { - start_block: StartBlock::Continue, - ..IndexerConfig::default() - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_start() - .with( - eq(last_published_block + 1), - eq(config_with_continue.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with( - eq(config_with_latest.get_registry_version()), - eq(config_with_latest.clone()), - ) - .returning(|_, _| Ok(())) - .once(); - block_streams_handler - .expect_start() - .with(eq(height), eq(config_with_height.clone())) - .returning(|_, _| Ok(())) - .once(); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_latest.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_clear_block_stream() - .with(eq(config_with_height.clone())) - .returning(|_| Ok(())) - .once(); - redis_client - .expect_get_last_published_block() - .with(eq(config_with_continue.clone())) - .returning(move |_| Ok(Some(last_published_block))); - - let state_manager = IndexerStateManager::default(); - let executors_handler = ExecutorsHandler::default(); - let registry = Registry::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .reconfigure_block_stream(&config_with_latest) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_height) - .await - .unwrap(); - synchroniser - .reconfigure_block_stream(&config_with_continue) - .await - .unwrap(); - } - - #[tokio::test] - async fn stops_disabled_indexers() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_synced() - .with(eq(config.clone())) - .returning(|_| Ok(())) - .never(); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - let data_layer_handler = DataLayerHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_existing_indexer(&config, &state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - // Simulate second run, start/stop etc should not be called - synchroniser - .sync_existing_indexer(&config, &state, None, None) - .await - .unwrap(); - } - } - - mod deleted { - use super::*; - - #[tokio::test] - async fn stops_block_stream_and_executor() { - let config = IndexerConfig::default(); - let state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - let executor = ExecutorInfo { - executor_id: "executor_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - status: "running".to_string(), - }; - let block_stream = StreamInfo { - stream_id: "stream_id".to_string(), - account_id: config.account_id.to_string(), - function_name: config.function_name.clone(), - version: config.get_registry_version(), - }; - - let mut block_streams_handler = BlockStreamsHandler::default(); - block_streams_handler - .expect_stop() - .with(eq("stream_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut executors_handler = ExecutorsHandler::default(); - executors_handler - .expect_stop() - .with(eq("executor_id".to_string())) - .returning(|_| Ok(())) - .once(); - - let mut state_manager = IndexerStateManager::default(); - state_manager.expect_delete_state().never(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Pending)); - - let registry = Registry::default(); - let redis_client = RedisClient::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&state, Some(&executor), Some(&block_stream)) - .await - .unwrap(); - } - - #[tokio::test] - async fn cleans_indexer_resources() { - let config = IndexerConfig::default(); - let provisioned_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Provisioned, - }; - let deprovisioning_state = IndexerState { - account_id: config.account_id.clone(), - function_name: config.function_name.clone(), - block_stream_synced_at: Some(config.get_registry_version()), - enabled: false, - provisioned_state: ProvisionedState::Deprovisioning { - task_id: "task_id".to_string(), - }, - }; - - let mut state_manager = IndexerStateManager::default(); - state_manager - .expect_set_deprovisioning() - .with(eq(provisioned_state.clone()), eq("task_id".to_string())) - .returning(|_, _| Ok(())); - state_manager - .expect_delete_state() - .with(eq(deprovisioning_state.clone())) - .returning(|_| Ok(())) - .once(); - - let mut data_layer_handler = DataLayerHandler::default(); - data_layer_handler - .expect_start_deprovisioning_task() - .with( - eq(config.clone().account_id), - eq(config.clone().function_name), - ) - .returning(|_, _| Ok("task_id".to_string())); - data_layer_handler - .expect_get_task_status() - .with(eq("task_id".to_string())) - .returning(|_| Ok(TaskStatus::Complete)); - - let mut redis_client = RedisClient::default(); - redis_client - .expect_del::() - .with(eq(config.get_redis_stream_key())) - .returning(|_| Ok(())) - .once(); - - let registry = Registry::default(); - let block_streams_handler = BlockStreamsHandler::default(); - let executors_handler = ExecutorsHandler::default(); - - let synchroniser = Synchroniser::new( - &block_streams_handler, - &executors_handler, - &data_layer_handler, - ®istry, - &state_manager, - &redis_client, - ); - - synchroniser - .sync_deleted_indexer(&provisioned_state, None, None) - .await - .unwrap(); - synchroniser - .sync_deleted_indexer(&deprovisioning_state, None, None) - .await - .unwrap(); - } - } -} diff --git a/runner-client/proto/runner.proto b/runner-client/proto/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner-client/proto/runner.proto +++ b/runner-client/proto/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/protos/runner.proto b/runner/protos/runner.proto index 82c457f7d..51045f8c5 100644 --- a/runner/protos/runner.proto +++ b/runner/protos/runner.proto @@ -10,6 +10,15 @@ service Runner { // Lists all Runner executor rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse); + + // Get Executor info + rpc GetExecutor (GetExecutorRequest) returns (ExecutorInfo); +} + +// Get Executor request +message GetExecutorRequest { + string account_id = 1; + string function_name = 2; } // Start Executor Request diff --git a/runner/src/server/services/runner/runner-service.test.ts b/runner/src/server/services/runner/runner-service.test.ts index bfbf9ca8d..f7a9c6dc1 100644 --- a/runner/src/server/services/runner/runner-service.test.ts +++ b/runner/src/server/services/runner/runner-service.test.ts @@ -32,6 +32,59 @@ describe('Runner gRPC Service', () => { genericIndexerConfig = new IndexerConfig(BASIC_REDIS_STREAM, BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_VERSION, BASIC_CODE, BASIC_SCHEMA, LogLevel.INFO); }); + it('get non existant executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + + await new Promise((resolve) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err) => { + expect(err).toEqual({ + code: grpc.status.NOT_FOUND, + message: `Executor for account ${BASIC_ACCOUNT_ID} and name ${BASIC_FUNCTION_NAME} does not exist` + }); + resolve(null); + }); + }); + }); + + it('gets an existing executor', async () => { + const streamHandlerType = jest.fn().mockImplementation((indexerConfig) => { + return { + indexerConfig, + executorContext: BASIC_EXECUTOR_CONTEXT + }; + }); + const service = getRunnerService(new Map(), streamHandlerType); + const request = generateRequest(BASIC_REDIS_STREAM + '-A', BASIC_ACCOUNT_ID, BASIC_FUNCTION_NAME, BASIC_CODE, BASIC_SCHEMA, BASIC_VERSION); + + await new Promise((resolve, reject) => { + service.StartExecutor(request, (err) => { + if (err) reject(err); + resolve(null); + }); + }); + + await new Promise((resolve, reject) => { + service.GetExecutor({ request: { accountId: BASIC_ACCOUNT_ID, functionName: BASIC_FUNCTION_NAME } } as any, (err, response) => { + if (err) reject(err); + + expect(response).toEqual({ + executorId: BASIC_EXECUTOR_ID, + accountId: genericIndexerConfig.accountId, + functionName: genericIndexerConfig.functionName, + status: IndexerStatus.RUNNING, + version: '1' + }); + resolve(null); + }); + }); + }); + it('starts a executor with correct settings', () => { const service = getRunnerService(new Map(), genericStreamHandlerType); const mockCallback = jest.fn() as unknown as any; diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 8584d185b..0d135574f 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -8,6 +8,7 @@ import parentLogger from '../../../logger'; import { type RunnerHandlers } from '../../../generated/runner/Runner'; import { type StartExecutorResponse__Output, type StartExecutorResponse } from '../../../generated/runner/StartExecutorResponse'; import { type StartExecutorRequest__Output } from '../../../generated/runner/StartExecutorRequest'; +import { type GetExecutorRequest__Output } from '../../../generated/runner/GetExecutorRequest'; import { type StopExecutorRequest__Output } from '../../../generated/runner/StopExecutorRequest'; import { type StopExecutorResponse__Output, type StopExecutorResponse } from '../../../generated/runner/StopExecutorResponse'; import { type ListExecutorsRequest__Output } from '../../../generated/runner/ListExecutorsRequest'; @@ -19,6 +20,28 @@ export function getRunnerService ( StreamHandlerType: typeof StreamHandler = StreamHandler ): RunnerHandlers { const RunnerService: RunnerHandlers = { + GetExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { + const { accountId, functionName } = call.request; + + const executorEntry = Array.from(executors.entries()).find(([_id, executor]) => executor.indexerConfig.accountId === accountId && executor.indexerConfig.functionName === functionName); + + if (executorEntry) { + const [executorId, executor] = executorEntry; + callback(null, { + executorId, + accountId: executor.indexerConfig.accountId, + functionName: executor.indexerConfig.functionName, + version: executor.indexerConfig.version.toString(), + status: executor.executorContext.status + }); + } else { + const notFoundError = { + code: grpc.status.NOT_FOUND, + message: `Executor for account ${accountId} and name ${functionName} does not exist` + }; + callback(notFoundError, null); + } + }, StartExecutor (call: ServerUnaryCall, callback: sendUnaryData): void { // Validate request const validationResult = validateStartExecutorRequest(call.request);