Skip to content

Commit

Permalink
chore: allow configuring isolate runner port (#1366)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Nov 18, 2024
1 parent b1d1156 commit 746a2f1
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 13 deletions.
7 changes: 4 additions & 3 deletions packages/infra/client/isolate-v8-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ mod throttle;
mod utils;

/// Manager port to connect to.
const RUNNER_PORT: u16 = 54321;
const THREAD_STATUS_POLL: Duration = Duration::from_millis(500);

#[tokio::main(flavor = "current_thread")]
Expand All @@ -45,6 +44,7 @@ async fn main() -> Result<()> {
.await?;

let actors_path = var("ACTORS_PATH")?;
let runner_addr = var("RUNNER_ADDR")?;
let actors_path = Path::new(&actors_path);

// Explicitly start runtime on current thread
Expand All @@ -56,7 +56,7 @@ async fn main() -> Result<()> {
let (fatal_tx, mut fatal_rx) = tokio::sync::mpsc::channel::<()>(1);

let res = tokio::select! {
res = retry_connection(actors_path, &mut actors, fatal_tx) => res,
res = retry_connection(actors_path, &runner_addr, &mut actors, fatal_tx) => res,
// If any fatal error occurs in the isolate threads, kill the entire program
_ = fatal_rx.recv() => Err(anyhow!("Fatal error")),
};
Expand All @@ -70,12 +70,13 @@ async fn main() -> Result<()> {

async fn retry_connection(
actors_path: &Path,
runner_addr: &str,
actors: &mut HashMap<Uuid, watch::Sender<()>>,
fatal_tx: mpsc::Sender<()>,
) -> Result<()> {
loop {
use std::result::Result::{Err, Ok};
match tokio_tungstenite::connect_async(format!("ws://0.0.0.0:{RUNNER_PORT}")).await {
match tokio_tungstenite::connect_async(format!("ws://{runner_addr}")).await {
Ok((socket, _)) => {
handle_connection(actors_path, actors, fatal_tx.clone(), socket).await?
}
Expand Down
23 changes: 23 additions & 0 deletions packages/infra/client/manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Client {
#[serde(default)]
pub logs: Logs,
#[serde(default)]
pub metrics: Metrics,
#[serde(default)]
pub vector: Option<Vector>,
}

Expand Down Expand Up @@ -99,6 +101,15 @@ impl Runtime {
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Actor {
pub network: ActorNetwork,

/// WebSocket Port for runners on this machine to connect to.
pub runner_port: Option<u16>,
}

impl Actor {
pub fn runner_port(&self) -> u16 {
self.runner_port.unwrap_or(54321)
}
}

#[derive(Clone, Deserialize)]
Expand Down Expand Up @@ -204,6 +215,18 @@ impl Logs {
}
}

#[derive(Clone, Deserialize, Default)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Metrics {
pub port: Option<u16>,
}

impl Metrics {
pub fn port(&self) -> u16 {
self.port.unwrap_or(6000)
}
}

#[derive(Clone, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Vector {
Expand Down
8 changes: 3 additions & 5 deletions packages/infra/client/manager/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use uuid::Uuid;
use crate::{actor::Actor, config::Config, metrics, runner, utils};

const PING_INTERVAL: Duration = Duration::from_secs(1);
/// TCP Port for runners to connect to.
const RUNNER_PORT: u16 = 54321;

#[derive(thiserror::Error, Debug)]
pub enum RuntimeError {
Expand Down Expand Up @@ -170,9 +168,9 @@ impl Ctx {
// Start runner socket
let self2 = self.clone();
let runner_socket = tokio::spawn(async move {
tracing::warn!(port=%RUNNER_PORT, "listening for runner sockets");
tracing::warn!(port=%self2.config().actor.runner_port(), "listening for runner sockets");

let listener = TcpListener::bind(("0.0.0.0", RUNNER_PORT))
let listener = TcpListener::bind(("0.0.0.0", self2.config().actor.runner_port()))
.await
.map_err(RuntimeError::RunnerSocketListenFailed)?;

Expand Down Expand Up @@ -346,7 +344,7 @@ impl Ctx {
let env = vec![(
"ACTORS_PATH",
self.actors_path().to_str().context("bad path")?.to_string(),
)];
), ("RUNNER_ADDR", format!("127.0.0.1:{}", self.config().actor.runner_port()))];
let working_path = self.isolate_runner_path();

let runner = runner::Handle::spawn_orphaned(
Expand Down
2 changes: 1 addition & 1 deletion packages/infra/client/manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn init() -> Result<Init> {

async fn run(init: Init) -> Result<()> {
// Start metrics server
let metrics_thread = tokio::spawn(metrics::run_standalone());
let metrics_thread = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port()));

tracing::info!("connecting to ws: {}", &init.url);

Expand Down
6 changes: 2 additions & 4 deletions packages/infra/client/manager/src/metrics/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use hyper::{
};
use prometheus::{Encoder, TextEncoder};

const METRICS_PORT: u16 = 6000;

#[tracing::instrument(skip_all)]
pub async fn run_standalone() -> anyhow::Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], METRICS_PORT));
pub async fn run_standalone(port: u16) -> anyhow::Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let server = Server::try_bind(&addr)?;

Expand Down

0 comments on commit 746a2f1

Please sign in to comment.