From 4a752e74f9bcc430353e9057b137de050bb552d1 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Fri, 10 May 2024 09:31:25 +1200 Subject: [PATCH] refactor: Rename internal indexer state --- coordinator/src/block_streams/synchronise.rs | 36 +++++++++++--------- coordinator/src/indexer_state.rs | 32 ++++++++--------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/coordinator/src/block_streams/synchronise.rs b/coordinator/src/block_streams/synchronise.rs index c6f3dc8fb..fc04a0452 100644 --- a/coordinator/src/block_streams/synchronise.rs +++ b/coordinator/src/block_streams/synchronise.rs @@ -89,7 +89,9 @@ async fn synchronise_block_stream( .await?; } - let sync_status = indexer_manager.get_sync_status(indexer_config).await?; + let sync_status = indexer_manager + .get_block_stream_sync_status(indexer_config) + .await?; clear_block_stream_if_needed(&sync_status, indexer_config, redis_client).await?; @@ -105,7 +107,9 @@ async fn synchronise_block_stream( .start(start_block_height, indexer_config) .await?; - indexer_manager.set_synced(indexer_config).await?; + indexer_manager + .set_block_stream_synced(indexer_config) + .await?; Ok(()) } @@ -187,11 +191,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Synced)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); @@ -245,11 +249,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Outdated)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); @@ -302,11 +306,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Outdated)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); @@ -359,11 +363,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Outdated)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); @@ -451,7 +455,7 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Synced)); @@ -499,11 +503,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Outdated)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); @@ -567,7 +571,7 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::Outdated)); @@ -615,11 +619,11 @@ mod tests { let mut mock_indexer_manager = IndexerStateManager::default(); mock_indexer_manager - .expect_get_sync_status() + .expect_get_block_stream_sync_status() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(SyncStatus::New)); mock_indexer_manager - .expect_set_synced() + .expect_set_block_stream_synced() .with(predicate::eq(indexer_config.clone())) .returning(|_| Ok(())) .once(); diff --git a/coordinator/src/indexer_state.rs b/coordinator/src/indexer_state.rs index 90f73065e..33a90edfb 100644 --- a/coordinator/src/indexer_state.rs +++ b/coordinator/src/indexer_state.rs @@ -13,8 +13,7 @@ pub enum SyncStatus { #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)] struct IndexerState { - // block_stream_synced_at/executor_synced_at? to? - synced_at_block_height: u64, + block_stream_synced_at: u64, } #[cfg(not(test))] @@ -22,14 +21,10 @@ pub use IndexerStateManagerImpl as IndexerStateManager; #[cfg(test)] pub use MockIndexerStateManagerImpl as IndexerStateManager; -// binary semaphore to protect updating redis simultaneously -// or wrap redis in a mutex pub struct IndexerStateManagerImpl { redis_client: RedisClient, } -// IndexerStateManager? -// StateManager? #[cfg_attr(test, mockall::automock)] impl IndexerStateManagerImpl { pub fn new(redis_client: RedisClient) -> Self { @@ -75,7 +70,7 @@ impl IndexerStateManagerImpl { self.set_state( indexer_config, IndexerState { - synced_at_block_height: version, + block_stream_synced_at: version, }, ) .await?; @@ -90,7 +85,7 @@ impl IndexerStateManagerImpl { Ok(()) } - pub async fn get_sync_status( + pub async fn get_block_stream_sync_status( &self, indexer_config: &IndexerConfig, ) -> anyhow::Result { @@ -104,7 +99,7 @@ impl IndexerStateManagerImpl { match indexer_config .get_registry_version() - .cmp(&indexer_state.synced_at_block_height) + .cmp(&indexer_state.block_stream_synced_at) { Ordering::Equal => Ok(SyncStatus::Synced), Ordering::Greater => Ok(SyncStatus::Outdated), @@ -118,10 +113,13 @@ impl IndexerStateManagerImpl { } } - pub async fn set_synced(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { + pub async fn set_block_stream_synced( + &self, + indexer_config: &IndexerConfig, + ) -> anyhow::Result<()> { let mut indexer_state = self.get_state(indexer_config).await?.unwrap_or_default(); - indexer_state.synced_at_block_height = indexer_config.get_registry_version(); + indexer_state.block_stream_synced_at = indexer_config.get_registry_version(); self.set_state(indexer_config, indexer_state).await?; @@ -275,7 +273,7 @@ mod tests { } #[tokio::test] - pub async fn outdated_indexer() { + pub async fn outdated_block_stream() { let indexer_config = IndexerConfig { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), @@ -302,7 +300,7 @@ mod tests { let indexer_manager = IndexerStateManagerImpl::new(redis_client); let result = indexer_manager - .get_sync_status(&indexer_config) + .get_block_stream_sync_status(&indexer_config) .await .unwrap(); @@ -310,7 +308,7 @@ mod tests { } #[tokio::test] - pub async fn synced_indexer() { + pub async fn synced_block_stream() { let indexer_config = IndexerConfig { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), @@ -337,7 +335,7 @@ mod tests { let indexer_manager = IndexerStateManagerImpl::new(redis_client); let result = indexer_manager - .get_sync_status(&indexer_config) + .get_block_stream_sync_status(&indexer_config) .await .unwrap(); @@ -345,7 +343,7 @@ mod tests { } #[tokio::test] - pub async fn new_indexer() { + pub async fn new_block_stream() { let indexer_config = IndexerConfig { account_id: "morgs.near".parse().unwrap(), function_name: "test".to_string(), @@ -368,7 +366,7 @@ mod tests { let indexer_manager = IndexerStateManagerImpl::new(redis_client); let result = indexer_manager - .get_sync_status(&indexer_config) + .get_block_stream_sync_status(&indexer_config) .await .unwrap();