Skip to content

Commit

Permalink
feat: Create rust GRPC client for Runner (#491)
Browse files Browse the repository at this point in the history
This PR creates a rust based GRPC client for Runner, for use within
Coordinator V2. For now, this exists as its own crate within the
top-level directory. The Coordinator PR was becoming quite large so I
decided to separate this out.

Additionally, I've made a couple changes to the Runner proto:
- Rename the package: `spec` -> `runner`
- Remove `executorId` from `StartExecutorRequest` - it should be
deterministic and assigned internally
- Add `version` to the executor - this will be used to determine whether
an executor should be restarted
  • Loading branch information
morgsmccauley authored Jan 8, 2024
1 parent b486ced commit 7655aef
Show file tree
Hide file tree
Showing 14 changed files with 1,502 additions and 214 deletions.
1,212 changes: 1,212 additions & 0 deletions runner-client/Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions runner-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "runner"
version = "0.1.0"
edition = "2021"

[dependencies]
prost = "0.12.3"
tonic = "0.10.2"
tokio = { version = "1.28.0", features = ["full"]}

[build-dependencies]
tonic-build = "0.10"
5 changes: 5 additions & 0 deletions runner-client/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/runner.proto")?;

Ok(())
}
17 changes: 17 additions & 0 deletions runner-client/examples/list_executors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use tonic::Request;

use runner::runner_client::RunnerClient;
use runner::ListExecutorsRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RunnerClient::connect("http://localhost:50007").await?;

let response = client
.list_executors(Request::new(ListExecutorsRequest {}))
.await?;

println!("{:#?}", response.into_inner());

Ok(())
}
23 changes: 23 additions & 0 deletions runner-client/examples/start_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use tonic::Request;

use runner::runner_client::RunnerClient;
use runner::StartExecutorRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RunnerClient::connect("http://localhost:50007").await?;

let response = client
.start_executor(Request::new(StartExecutorRequest {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
code: "console.log('hi')".to_string(),
schema: "CREATE TABLE blocks()".to_string(),
redis_stream: "morgs.near/test:block_stream".to_string(),
}))
.await?;

println!("{:#?}", response.into_inner());

Ok(())
}
21 changes: 21 additions & 0 deletions runner-client/examples/stop_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use tonic::Request;

use runner::runner_client::RunnerClient;
use runner::StopExecutorRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RunnerClient::connect("http://localhost:50007").await?;

let response = client
.stop_executor(Request::new(StopExecutorRequest {
// Deterministic ID for morgs.near/test
executor_id: "be21b48c307671c1b3768ed84439f736c1cbbd77f815986354e855d44efd16e6"
.to_string(),
}))
.await?;

println!("{:#?}", response.into_inner());

Ok(())
}
59 changes: 59 additions & 0 deletions runner-client/proto/runner.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
syntax = "proto3";
package runner;

service Runner {
// Starts a new Runner executor
rpc StartExecutor (StartExecutorRequest) returns (StartExecutorResponse);

// Stops an existing Runner executor
rpc StopExecutor (StopExecutorRequest) returns (StopExecutorResponse);

// Lists all Runner executor
rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse);
}

// Start Executor Request
message StartExecutorRequest {
string redis_stream = 1;
string account_id = 2;
string function_name = 3;
string code = 4;
string schema = 5;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 6;
}

// Start Executor Response
message StartExecutorResponse {
string executor_id = 1;
}

// Stop Executor Request
message StopExecutorRequest {
string executor_id = 1;
}

// Stop Executor Response
message StopExecutorResponse {
string executor_id = 1;
}

// List Executor Request
message ListExecutorsRequest {
}

// List Executor Response
message ListExecutorsResponse {
// List of all executors, including stopped or crashed ones
repeated ExecutorInfo executors = 1;
}

// Information about a single BlockExecutor instance.
message ExecutorInfo {
string executor_id = 1;
string account_id = 2;
string function_name = 3;
string status = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
}
5 changes: 5 additions & 0 deletions runner-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod runner {
tonic::include_proto!("runner");
}

pub use runner::*;
17 changes: 10 additions & 7 deletions runner/protos/runner.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
syntax = "proto3";
package spec;
package runner;

service Runner {
// Starts a new Runner executor
Expand All @@ -14,12 +14,13 @@ service Runner {

// Start Executor Request
message StartExecutorRequest {
string executor_id = 1;
string redis_stream = 2;
string account_id = 3;
string function_name = 4;
string code = 5;
string schema = 6;
string redis_stream = 1;
string account_id = 2;
string function_name = 3;
string code = 4;
string schema = 5;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 6;
}

// Start Executor Response
Expand Down Expand Up @@ -53,4 +54,6 @@ message ExecutorInfo {
string account_id = 2;
string function_name = 3;
string status = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
}
15 changes: 0 additions & 15 deletions runner/src/server/runner-client.ts

This file was deleted.

2 changes: 1 addition & 1 deletion runner/src/server/runner-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default function startRunnerServer (): grpc.Server {
) as unknown) as ProtoGrpcType;

const server = new grpc.Server();
server.addService(runnerProto.spec.Runner.service, getRunnerService(StreamHandler));
server.addService(runnerProto.runner.Runner.service, getRunnerService(StreamHandler));
const credentials = grpc.ServerCredentials;

server.bindAsync(
Expand Down
Loading

0 comments on commit 7655aef

Please sign in to comment.