diff --git a/packages/infra/client/isolate-v8-runner/src/main.rs b/packages/infra/client/isolate-v8-runner/src/main.rs index a8a4f78ab..45bf98b7b 100644 --- a/packages/infra/client/isolate-v8-runner/src/main.rs +++ b/packages/infra/client/isolate-v8-runner/src/main.rs @@ -24,7 +24,6 @@ mod throttle; mod utils; /// Manager port to connect to. -const RUNNER_PORT: u16 = 54321; const THREAD_STATUS_POLL: Duration = Duration::from_millis(500); #[tokio::main(flavor = "current_thread")] @@ -45,6 +44,7 @@ async fn main() -> Result<()> { .await?; let actors_path = var("ACTORS_PATH")?; + let runner_addr = var("RUNNER_ADDR")?; let actors_path = Path::new(&actors_path); // Explicitly start runtime on current thread @@ -56,7 +56,7 @@ async fn main() -> Result<()> { let (fatal_tx, mut fatal_rx) = tokio::sync::mpsc::channel::<()>(1); let res = tokio::select! { - res = retry_connection(actors_path, &mut actors, fatal_tx) => res, + res = retry_connection(actors_path, &runner_addr, &mut actors, fatal_tx) => res, // If any fatal error occurs in the isolate threads, kill the entire program _ = fatal_rx.recv() => Err(anyhow!("Fatal error")), }; @@ -70,12 +70,13 @@ async fn main() -> Result<()> { async fn retry_connection( actors_path: &Path, + runner_addr: &str, actors: &mut HashMap>, fatal_tx: mpsc::Sender<()>, ) -> Result<()> { loop { use std::result::Result::{Err, Ok}; - match tokio_tungstenite::connect_async(format!("ws://0.0.0.0:{RUNNER_PORT}")).await { + match tokio_tungstenite::connect_async(format!("ws://{runner_addr}")).await { Ok((socket, _)) => { handle_connection(actors_path, actors, fatal_tx.clone(), socket).await? } diff --git a/packages/infra/client/manager/src/config.rs b/packages/infra/client/manager/src/config.rs index 1c5745d93..4393746cc 100644 --- a/packages/infra/client/manager/src/config.rs +++ b/packages/infra/client/manager/src/config.rs @@ -54,6 +54,8 @@ pub struct Client { #[serde(default)] pub logs: Logs, #[serde(default)] + pub metrics: Metrics, + #[serde(default)] pub vector: Option, } @@ -99,6 +101,15 @@ impl Runtime { #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct Actor { pub network: ActorNetwork, + + /// WebSocket Port for runners on this machine to connect to. + pub runner_port: Option, +} + +impl Actor { + pub fn runner_port(&self) -> u16 { + self.runner_port.unwrap_or(54321) + } } #[derive(Clone, Deserialize)] @@ -204,6 +215,18 @@ impl Logs { } } +#[derive(Clone, Deserialize, Default)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct Metrics { + pub port: Option, +} + +impl Metrics { + pub fn port(&self) -> u16 { + self.port.unwrap_or(6000) + } +} + #[derive(Clone, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct Vector { diff --git a/packages/infra/client/manager/src/ctx.rs b/packages/infra/client/manager/src/ctx.rs index 9b5f80722..8f0eb0bf9 100644 --- a/packages/infra/client/manager/src/ctx.rs +++ b/packages/infra/client/manager/src/ctx.rs @@ -20,8 +20,6 @@ use uuid::Uuid; use crate::{actor::Actor, config::Config, metrics, runner, utils}; const PING_INTERVAL: Duration = Duration::from_secs(1); -/// TCP Port for runners to connect to. -const RUNNER_PORT: u16 = 54321; #[derive(thiserror::Error, Debug)] pub enum RuntimeError { @@ -170,9 +168,9 @@ impl Ctx { // Start runner socket let self2 = self.clone(); let runner_socket = tokio::spawn(async move { - tracing::warn!(port=%RUNNER_PORT, "listening for runner sockets"); + tracing::warn!(port=%self2.config().actor.runner_port(), "listening for runner sockets"); - let listener = TcpListener::bind(("0.0.0.0", RUNNER_PORT)) + let listener = TcpListener::bind(("0.0.0.0", self2.config().actor.runner_port())) .await .map_err(RuntimeError::RunnerSocketListenFailed)?; @@ -346,7 +344,7 @@ impl Ctx { let env = vec![( "ACTORS_PATH", self.actors_path().to_str().context("bad path")?.to_string(), - )]; + ), ("RUNNER_ADDR", format!("127.0.0.1:{}", self.config().actor.runner_port()))]; let working_path = self.isolate_runner_path(); let runner = runner::Handle::spawn_orphaned( diff --git a/packages/infra/client/manager/src/main.rs b/packages/infra/client/manager/src/main.rs index 364f76a71..5a143e1e0 100644 --- a/packages/infra/client/manager/src/main.rs +++ b/packages/infra/client/manager/src/main.rs @@ -147,7 +147,7 @@ async fn init() -> Result { async fn run(init: Init) -> Result<()> { // Start metrics server - let metrics_thread = tokio::spawn(metrics::run_standalone()); + let metrics_thread = tokio::spawn(metrics::run_standalone(init.config.client.metrics.port())); tracing::info!("connecting to ws: {}", &init.url); diff --git a/packages/infra/client/manager/src/metrics/server.rs b/packages/infra/client/manager/src/metrics/server.rs index 2ea8b4fb6..b64a6ce82 100644 --- a/packages/infra/client/manager/src/metrics/server.rs +++ b/packages/infra/client/manager/src/metrics/server.rs @@ -7,11 +7,9 @@ use hyper::{ }; use prometheus::{Encoder, TextEncoder}; -const METRICS_PORT: u16 = 6000; - #[tracing::instrument(skip_all)] -pub async fn run_standalone() -> anyhow::Result<()> { - let addr = SocketAddr::from(([0, 0, 0, 0], METRICS_PORT)); +pub async fn run_standalone(port: u16) -> anyhow::Result<()> { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); let server = Server::try_bind(&addr)?;