Skip to content

Commit

Permalink
feat: Expose RPC from Coordinator to enable/disable Indexers (#729)
Browse files Browse the repository at this point in the history
This PR exposes a new gRPC endpoint from Coordinator to "manage"
indexers. Currently, this only allows for enabling/disabling, but will
probably be expanded over time. There isn't any intention to use this
from another service, it's more of a manual & internal tool that we can
use.

The endpoint is essentially just a wrapper over the persistent Redis
state. The exposed methods end up mutating this state, which in turn, is
then used to govern how Indexers should be synchronised.

Within the `coordinator/` directory, the endpoint can be used with
`grpcurl` like so:
- enable: `grpcurl -plaintext -proto proto/indexer_manager.proto -d
'{"account_id": "morgs.near", "function_name": "test"}' 0.0.0.0:8002
indexer.IndexerManager.Enable`
- disable: `grpcurl -plaintext -proto proto/indexer_manager.proto -d
'{"account_id": "morgs.near", "function_name": "test"}' 0.0.0.0:8002
indexer.IndexerManager.Disable`
- list: `grpcurl -plaintext -proto proto/indexer_manager.proto
0.0.0.0:8002 indexer.IndexerManager.List`
  • Loading branch information
morgsmccauley authored May 13, 2024
1 parent 702fe46 commit 94fe3b3
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 55 deletions.
2 changes: 2 additions & 0 deletions coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
futures-util = "0.3.30"
prost = "0.12.3"
redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] }
tokio = "1.28"
tonic = "0.10.2"
Expand All @@ -22,5 +23,8 @@ near-jsonrpc-client = "0.8.0"
near-primitives = "0.20.0"
near-jsonrpc-primitives = "0.20.0"

[build-dependencies]
tonic-build = "0.10"

[dev-dependencies]
mockall = "0.11.4"
5 changes: 5 additions & 0 deletions coordinator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/indexer_manager.proto")?;

Ok(())
}
47 changes: 47 additions & 0 deletions coordinator/proto/indexer_manager.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
syntax = "proto3";

package indexer;

// The IndexerManager service provides RPCs to manage Indexer instances
service IndexerManager {
// Re-enable an existing Indexer
rpc Enable (IndexerRequest) returns (EnableIndexerResponse);

// Disable an Indexer, preventing it from running
rpc Disable (IndexerRequest) returns (DisableIndexerResponse);

// List all Indexer with their state
rpc List (Empty) returns (ListIndexersResponse);
}

// Request message for managing Indexers
message IndexerRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Response message for enabling Indexer
message EnableIndexerResponse {
bool success = 1;
}

// Response message for disabling Indexer
message DisableIndexerResponse {
bool success = 1;
}

// Reponse message for listing Indexers
message ListIndexersResponse {
repeated IndexerState indexers = 1;
}

// Persisted state relevant to Indexer
message IndexerState {
string account_id = 1;
string function_name = 2;
bool enabled = 3;
}

message Empty {}
19 changes: 10 additions & 9 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ async fn synchronise_block_stream(
let start_block_height =
determine_start_block_height(&sync_status, indexer_config, redis_client).await?;

tracing::info!(
"Starting new block stream starting at block {}",
start_block_height
);

block_streams_handler
.start(start_block_height, indexer_config)
.await?;
Expand Down Expand Up @@ -136,16 +131,22 @@ async fn determine_start_block_height(
redis_client: &RedisClient,
) -> anyhow::Result<u64> {
if sync_status == &SyncStatus::Synced {
tracing::info!("Resuming block stream");
let height = get_continuation_block_height(indexer_config, redis_client).await?;

tracing::info!(height, "Resuming block stream");

return get_continuation_block_height(indexer_config, redis_client).await;
return Ok(height);
}

match indexer_config.start_block {
let height = match indexer_config.start_block {
StartBlock::Latest => Ok(indexer_config.get_registry_version()),
StartBlock::Height(height) => Ok(height),
StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await,
}
}?;

tracing::info!(height, "Starting block stream");

Ok(height)
}

async fn get_continuation_block_height(
Expand Down
Loading

0 comments on commit 94fe3b3

Please sign in to comment.