-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Expose RPC from Coordinator to enable/disable Indexers #729
Conversation
65d8e5d
to
7a27d05
Compare
7a27d05
to
a9035d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall! Could you also add the new env variable to the Docker Compose? Also, as far as running grpcurl, how would we do that in the dev/prod machines? Should we install grpcurl in the Dockerfile so that it is available if you enter the docker container?
pub async fn migrate_state_if_needed( | ||
&self, | ||
indexer_registry: &IndexerRegistry, | ||
) -> anyhow::Result<()> { | ||
if self.redis_client.is_migration_complete().await?.is_some() { | ||
return Ok(()); | ||
if self.redis_client.is_migration_complete().await?.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still check for and try to perform the original migration? Wasn't it completed in both dev and prod already anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I meant to call this out - Only dev has been migrated, so we still need this so prod can be migrated. They also need to be separate steps because dev is partially migrated. Essentially, this code will work for both dev/prod, even with them being in different states.
.await? | ||
.is_none() | ||
{ | ||
tracing::info!("Migrating enabled flag"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I feel like this is the log which should say "Migrating indexer state" whereas the section above this should be maybe "Completing old migration" or something.
Or at least "Migration flag enabled. Migrating...".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first migration will not run for dev, only prod, and for prod it isn't really "old".
These are only temporary logs, as the migration will be removed once completed. I just wanted something to signify start/end, the logs themselves don't really matter.
block_stream_synced_at: Option<u64>, | ||
} | ||
|
||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | ||
pub struct IndexerState { | ||
pub block_stream_synced_at: Option<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this? What about simply removing it? Even if migrating state goes wrong, losing this information doesn't impact us, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is the "stream version", previously stored under e.g. morgs.near/test:block_stream:version
, but rather than have it as a separate key it's stored with under Indexer state. This is what prevents us from starting a historical backfill again.
.with(predicate::eq(darunrs_config.clone())) | ||
.returning(|_| { | ||
Ok(Some( | ||
serde_json::json!({ "block_stream_synced_at": 1, "enabled": false }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How dare you disable my indexer! 😡 /s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅
@darunrs we'll need to ssh tunnel to the machine like we do with the 'examples', and can then run |
This PR exposes a new gRPC endpoint from Coordinator to "manage" indexers. Currently, this only allows for enabling/disabling, but will probably be expanded over time. There isn't any intention to use this from another service, it's more of a manual & internal tool that we can use.
The endpoint is essentially just a wrapper over the persistent Redis state. The exposed methods end up mutating this state, which in turn, is then used to govern how Indexers should be synchronised.
Within the
coordinator/
directory, the endpoint can be used withgrpcurl
like so:grpcurl -plaintext -proto proto/indexer_manager.proto -d '{"account_id": "morgs.near", "function_name": "test"}' 0.0.0.0:8002 indexer.IndexerManager.Enable
grpcurl -plaintext -proto proto/indexer_manager.proto -d '{"account_id": "morgs.near", "function_name": "test"}' 0.0.0.0:8002 indexer.IndexerManager.Disable
grpcurl -plaintext -proto proto/indexer_manager.proto 0.0.0.0:8002 indexer.IndexerManager.List