Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose RPC from Coordinator to enable/disable Indexers #729

Merged
merged 14 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
futures-util = "0.3.30"
prost = "0.12.3"
redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] }
tokio = "1.28"
tonic = "0.10.2"
Expand All @@ -22,5 +23,8 @@ near-jsonrpc-client = "0.8.0"
near-primitives = "0.20.0"
near-jsonrpc-primitives = "0.20.0"

[build-dependencies]
tonic-build = "0.10"

[dev-dependencies]
mockall = "0.11.4"
5 changes: 5 additions & 0 deletions coordinator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/indexer_manager.proto")?;

Ok(())
}
47 changes: 47 additions & 0 deletions coordinator/proto/indexer_manager.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
syntax = "proto3";

package indexer;

// The IndexerManager service provides RPCs to manage Indexer instances
service IndexerManager {
// Re-enable an existing Indexer
rpc Enable (IndexerRequest) returns (EnableIndexerResponse);

// Disable an Indexer, preventing it from running
rpc Disable (IndexerRequest) returns (DisableIndexerResponse);

// List all Indexer with their state
rpc List (Empty) returns (ListIndexersResponse);
}

// Request message for managing Indexers
message IndexerRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Response message for enabling Indexer
message EnableIndexerResponse {
bool success = 1;
}

// Response message for disabling Indexer
message DisableIndexerResponse {
bool success = 1;
}

// Reponse message for listing Indexers
message ListIndexersResponse {
repeated IndexerState indexers = 1;
}

// Persisted state relevant to Indexer
message IndexerState {
string account_id = 1;
string function_name = 2;
bool enabled = 3;
}

message Empty {}
19 changes: 10 additions & 9 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ async fn synchronise_block_stream(
let start_block_height =
determine_start_block_height(&sync_status, indexer_config, redis_client).await?;

tracing::info!(
"Starting new block stream starting at block {}",
start_block_height
);

block_streams_handler
.start(start_block_height, indexer_config)
.await?;
Expand Down Expand Up @@ -136,16 +131,22 @@ async fn determine_start_block_height(
redis_client: &RedisClient,
) -> anyhow::Result<u64> {
if sync_status == &SyncStatus::Synced {
tracing::info!("Resuming block stream");
let height = get_continuation_block_height(indexer_config, redis_client).await?;

tracing::info!(height, "Resuming block stream");

return get_continuation_block_height(indexer_config, redis_client).await;
return Ok(height);
}

match indexer_config.start_block {
let height = match indexer_config.start_block {
StartBlock::Latest => Ok(indexer_config.get_registry_version()),
StartBlock::Height(height) => Ok(height),
StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await,
}
}?;

tracing::info!(height, "Starting block stream");

Ok(height)
}

async fn get_continuation_block_height(
Expand Down
Loading
Loading