From 71adb50866b71b04ddbecd27808d72d819a35823 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 12 Apr 2024 15:48:01 -0500 Subject: [PATCH] feat: add bind configuration for coordination daemon Ref: #459 --- binaries/cli/src/main.rs | 11 ++++++++--- binaries/coordinator/src/lib.rs | 6 ++---- binaries/coordinator/src/listener.rs | 7 +++---- examples/multiple-daemons/run.rs | 7 ++++--- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 345931d31..405ef7f4c 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -116,7 +116,12 @@ enum Command { /// Run runtime Runtime, /// Run coordinator - Coordinator { port: Option }, + Coordinator { + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) + )] + bind: SocketAddr, + }, } #[derive(Debug, clap::Args)] @@ -270,14 +275,14 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - Command::Coordinator { port } => { + Command::Coordinator { bind } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { let (_port, task) = - dora_coordinator::start(port, futures::stream::empty::()).await?; + dora_coordinator::start(bind, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")? diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3d6be47d7..af53c6ea3 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -11,7 +11,6 @@ use dora_core::{ message::uhlc::{self, HLC}, topics::{ control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_DEFAULT, }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; @@ -39,11 +38,10 @@ mod run; mod tcp_utils; pub async fn start( - port: Option, + bind: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { - let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); - let listener = listener::create_listener(port).await?; + let listener = listener::create_listener(bind).await?; let port = listener .local_addr() .wrap_err("failed to get local addr of listener")? diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index ee78755fd..824d1168c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,15 +1,14 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc}; +use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, }; -pub async fn create_listener(port: u16) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, port)).await { +pub async fn create_listener(bind: SocketAddr) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index bd0722135..9a7b30b08 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,14 +1,14 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; use std::{ collections::BTreeSet, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, time::Duration, }; @@ -34,8 +34,9 @@ async fn main() -> eyre::Result<()> { build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); + let coordinator_bind = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT); let (coordinator_port, coordinator) = - dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; + dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)).await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());