Skip to content

Commit

Permalink
refactor: Rename internal indexer state
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed May 9, 2024
1 parent 06d9ad9 commit 550eefe
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
36 changes: 20 additions & 16 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ async fn synchronise_block_stream(
.await?;
}

let sync_status = indexer_manager.get_sync_status(indexer_config).await?;
let sync_status = indexer_manager
.get_block_stream_sync_status(indexer_config)
.await?;

clear_block_stream_if_needed(&sync_status, indexer_config, redis_client).await?;

Expand All @@ -105,7 +107,9 @@ async fn synchronise_block_stream(
.start(start_block_height, indexer_config)
.await?;

indexer_manager.set_synced(indexer_config).await?;
indexer_manager
.set_block_stream_synced(indexer_config)
.await?;

Ok(())
}
Expand Down Expand Up @@ -187,11 +191,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Synced));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down Expand Up @@ -245,11 +249,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Outdated));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down Expand Up @@ -302,11 +306,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Outdated));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down Expand Up @@ -359,11 +363,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Outdated));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down Expand Up @@ -451,7 +455,7 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Synced));

Expand Down Expand Up @@ -499,11 +503,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Outdated));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down Expand Up @@ -567,7 +571,7 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::Outdated));

Expand Down Expand Up @@ -615,11 +619,11 @@ mod tests {

let mut mock_indexer_manager = IndexerStateManager::default();
mock_indexer_manager
.expect_get_sync_status()
.expect_get_block_stream_sync_status()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(SyncStatus::New));
mock_indexer_manager
.expect_set_synced()
.expect_set_block_stream_synced()
.with(predicate::eq(indexer_config.clone()))
.returning(|_| Ok(()))
.once();
Expand Down
42 changes: 20 additions & 22 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,18 @@ pub enum SyncStatus {

#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
struct IndexerState {
// block_stream_synced_at/executor_synced_at? to?
synced_at_block_height: u64,
block_stream_synced_at: u64,
}

#[cfg(not(test))]
pub use IndexerStateManagerImpl as IndexerStateManager;
#[cfg(test)]
pub use MockIndexerStateManagerImpl as IndexerStateManager;

// binary semaphore to protect updating redis simultaneously
// or wrap redis in a mutex
pub struct IndexerStateManagerImpl {
redis_client: RedisClient,
}

// IndexerStateManager?
// StateManager?
#[cfg_attr(test, mockall::automock)]
impl IndexerStateManagerImpl {
pub fn new(redis_client: RedisClient) -> Self {
Expand Down Expand Up @@ -75,7 +70,7 @@ impl IndexerStateManagerImpl {
self.set_state(
indexer_config,
IndexerState {
synced_at_block_height: version,
block_stream_synced_at: version,
},
)
.await?;
Expand All @@ -90,7 +85,7 @@ impl IndexerStateManagerImpl {
Ok(())
}

pub async fn get_sync_status(
pub async fn get_block_stream_sync_status(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<SyncStatus> {
Expand All @@ -104,7 +99,7 @@ impl IndexerStateManagerImpl {

match indexer_config
.get_registry_version()
.cmp(&indexer_state.synced_at_block_height)
.cmp(&indexer_state.block_stream_synced_at)
{
Ordering::Equal => Ok(SyncStatus::Synced),
Ordering::Greater => Ok(SyncStatus::Outdated),
Expand All @@ -118,10 +113,13 @@ impl IndexerStateManagerImpl {
}
}

pub async fn set_synced(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn set_block_stream_synced(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?.unwrap_or_default();

indexer_state.synced_at_block_height = indexer_config.get_registry_version();
indexer_state.block_stream_synced_at = indexer_config.get_registry_version();

self.set_state(indexer_config, indexer_state).await?;

Expand Down Expand Up @@ -201,15 +199,15 @@ mod tests {
.expect_set_indexer_state()
.with(
predicate::eq(morgs_config),
predicate::eq(serde_json::json!({ "synced_at_block_height": 200 }).to_string()),
predicate::eq(serde_json::json!({ "block_stream_synced_at": 200 }).to_string()),
)
.returning(|_, _| Ok(()))
.once();
mock_redis_client
.expect_set_indexer_state()
.with(
predicate::eq(darunrs_config),
predicate::eq(serde_json::json!({ "synced_at_block_height": 1 }).to_string()),
predicate::eq(serde_json::json!({ "block_stream_synced_at": 1 }).to_string()),
)
.returning(|_, _| Ok(()))
.once();
Expand Down Expand Up @@ -261,7 +259,7 @@ mod tests {
.expect_set_indexer_state()
.with(
predicate::eq(morgs_config),
predicate::eq(serde_json::json!({ "synced_at_block_height": 200 }).to_string()),
predicate::eq(serde_json::json!({ "block_stream_synced_at": 200 }).to_string()),
)
.returning(|_, _| Ok(()))
.never();
Expand All @@ -275,7 +273,7 @@ mod tests {
}

#[tokio::test]
pub async fn outdated_indexer() {
pub async fn outdated_block_stream() {
let indexer_config = IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
Expand All @@ -296,21 +294,21 @@ mod tests {
.with(predicate::eq(indexer_config.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "synced_at_block_height": 300 }).to_string(),
serde_json::json!({ "block_stream_synced_at": 300 }).to_string(),
))
});

let indexer_manager = IndexerStateManagerImpl::new(redis_client);
let result = indexer_manager
.get_sync_status(&indexer_config)
.get_block_stream_sync_status(&indexer_config)
.await
.unwrap();

assert_eq!(result, SyncStatus::Outdated);
}

#[tokio::test]
pub async fn synced_indexer() {
pub async fn synced_block_stream() {
let indexer_config = IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
Expand All @@ -331,21 +329,21 @@ mod tests {
.with(predicate::eq(indexer_config.clone()))
.returning(|_| {
Ok(Some(
serde_json::json!({ "synced_at_block_height": 200 }).to_string(),
serde_json::json!({ "block_stream_synced_at": 200 }).to_string(),
))
});

let indexer_manager = IndexerStateManagerImpl::new(redis_client);
let result = indexer_manager
.get_sync_status(&indexer_config)
.get_block_stream_sync_status(&indexer_config)
.await
.unwrap();

assert_eq!(result, SyncStatus::Synced);
}

#[tokio::test]
pub async fn new_indexer() {
pub async fn new_block_stream() {
let indexer_config = IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
Expand All @@ -368,7 +366,7 @@ mod tests {

let indexer_manager = IndexerStateManagerImpl::new(redis_client);
let result = indexer_manager
.get_sync_status(&indexer_config)
.get_block_stream_sync_status(&indexer_config)
.await
.unwrap();

Expand Down

0 comments on commit 550eefe

Please sign in to comment.