From f6590224b4d32476032b87e5e541d7f5605fd6bc Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 12 Apr 2024 15:29:59 -0500 Subject: [PATCH 01/11] feat: add bind configuration for the inter_daemon listener Ref: #459 --- binaries/cli/src/main.rs | 12 ++++++++++-- binaries/daemon/src/inter_daemon.rs | 10 +++------- binaries/daemon/src/lib.rs | 8 ++++++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index e6b5f757b..4f2c1623e 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,4 +1,7 @@ -use std::{net::Ipv4Addr, path::PathBuf}; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, +}; use attach::attach_dataflow; use clap::Parser; @@ -103,6 +106,10 @@ enum Command { Daemon { #[clap(long)] machine_id: Option, + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) + )] + bind: SocketAddr, #[clap(long)] coordinator_addr: Option, @@ -280,6 +287,7 @@ fn run() -> eyre::Result<()> { } Command::Daemon { coordinator_addr, + bind, machine_id, run_dataflow, } => { @@ -306,7 +314,7 @@ fn run() -> eyre::Result<()> { let localhost = Ipv4Addr::new(127, 0, 0, 1); (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() }); - Daemon::run(addr, machine_id.unwrap_or_default()).await + Daemon::run(addr, machine_id.unwrap_or_default(), bind).await } } }) diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 40a8a34b3..ed771e961 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -1,11 +1,7 @@ use crate::tcp_utils::{tcp_receive, tcp_send}; use dora_core::daemon_messages::{InterDaemonEvent, Timestamped}; use eyre::{Context, ContextCompat}; -use std::{ - collections::BTreeMap, - io::ErrorKind, - net::{Ipv4Addr, SocketAddr}, -}; +use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr}; use tokio::net::{TcpListener, TcpStream}; pub struct InterDaemonConnection { @@ -65,11 +61,11 @@ pub async fn send_inter_daemon_event( } pub async fn spawn_listener_loop( + bind: SocketAddr, machine_id: String, events_tx: flume::Sender>, ) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, 0)).await { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9865d0eb9..857ecd17e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -77,7 +77,11 @@ pub struct Daemon { } impl Daemon { - pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> { + pub async fn run( + coordinator_addr: SocketAddr, + machine_id: String, + bind_addr: SocketAddr, + ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; @@ -85,7 +89,7 @@ impl Daemon { // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); let listen_socket = - inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?; + inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), timestamp: e.timestamp, From 3a6323ff0cf9aef613792e66f2a202c27908920c Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 12 Apr 2024 15:48:01 -0500 Subject: [PATCH 02/11] feat: add bind configuration for coordination daemon Ref: #459 --- binaries/cli/src/main.rs | 11 ++++++++--- binaries/coordinator/src/lib.rs | 10 +++------- binaries/coordinator/src/listener.rs | 7 +++---- examples/multiple-daemons/run.rs | 11 ++++++++--- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4f2c1623e..f02482fd6 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -119,7 +119,12 @@ enum Command { /// Run runtime Runtime, /// Run coordinator - Coordinator { port: Option }, + Coordinator { + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) + )] + bind: SocketAddr, + }, } #[derive(Debug, clap::Args)] @@ -273,14 +278,14 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - Command::Coordinator { port } => { + Command::Coordinator { bind } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { let (_port, task) = - dora_coordinator::start(port, futures::stream::empty::()).await?; + dora_coordinator::start(bind, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")? diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3d6be47d7..1014ea152 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,10 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_DEFAULT, - }, + topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -39,11 +36,10 @@ mod run; mod tcp_utils; pub async fn start( - port: Option, + bind: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { - let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); - let listener = listener::create_listener(port).await?; + let listener = listener::create_listener(bind).await?; let port = listener .local_addr() .wrap_err("failed to get local addr of listener")? diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index ee78755fd..824d1168c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,15 +1,14 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc}; +use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, }; -pub async fn create_listener(port: u16) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, port)).await { +pub async fn create_listener(bind: SocketAddr) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index bd0722135..95f64de70 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,14 +1,14 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; use std::{ collections::BTreeSet, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, time::Duration, }; @@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> { build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); + let coordinator_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_DEFAULT, + ); let (coordinator_port, coordinator) = - dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; + dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); From 58eac2902479b1e304a89a0775adf90f91b8b75c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 16 Apr 2024 17:54:01 +0200 Subject: [PATCH 03/11] Increase log level of 'disconnecting daemons' message to `error` --- binaries/coordinator/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1014ea152..28644886f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -496,7 +496,7 @@ async fn start_inner( } } if !disconnected.is_empty() { - tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}"); + tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}"); for machine_id in disconnected { daemon_connections.remove(&machine_id); } From 279530e21dca7cfa4d86d6241753a2ec64dcc8fa Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 16 Apr 2024 17:57:10 +0200 Subject: [PATCH 04/11] Delay watchdog disconnect to 30 seconds --- binaries/coordinator/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 28644886f..c7c303d14 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -477,6 +477,12 @@ async fn start_inner( let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { + tracing::warn!( + "no heartbeat message from machine `{machine_id}` since {:?}", + connection.last_heartbeat.elapsed() + ) + } + if connection.last_heartbeat.elapsed() > Duration::from_secs(30) { disconnected.insert(machine_id.clone()); continue; } From 1ba763cd6782acd37a9de07eae135cc7d8b6ea2b Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 16 Apr 2024 12:28:44 -0500 Subject: [PATCH 05/11] update dora_coordinator::start to return bound addr instead of just the port --- binaries/coordinator/src/lib.rs | 9 ++++----- examples/multiple-daemons/run.rs | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7c303d14..eb6f8efcd 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -38,12 +38,11 @@ mod tcp_utils; pub async fn start( bind: SocketAddr, external_events: impl Stream + Unpin, -) -> Result<(u16, impl Future>), eyre::ErrReport> { +) -> Result<(SocketAddr, impl Future>), eyre::ErrReport> { let listener = listener::create_listener(bind).await?; - let port = listener + let bound_addr = listener .local_addr() - .wrap_err("failed to get local addr of listener")? - .port(); + .wrap_err("failed to get local addr of listener")?; let mut tasks = FuturesUnordered::new(); // Setup ctrl-c handler @@ -61,7 +60,7 @@ pub async fn start( tracing::debug!("all spawned tasks finished, exiting.."); Ok(()) }; - Ok((port, future)) + Ok((bound_addr, future)) } // Resolve the dataflow name. diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 95f64de70..654509a9e 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -38,10 +38,9 @@ async fn main() -> eyre::Result<()> { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); - let (coordinator_port, coordinator) = + let (coordinator_addr, coordinator) = dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) .await?; - let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); From 4af82a4833bea5f277ab7c6f392ddd03a4b9380f Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 16 Apr 2024 15:38:54 -0500 Subject: [PATCH 06/11] revert to daemons listening to 127.0.0.1 by default clients can not connect to `0.0.0.0` on windows. --- binaries/cli/src/main.rs | 2 +- binaries/daemon/src/inter_daemon.rs | 1 + examples/multiple-daemons/run.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f02482fd6..93db5eb23 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -107,7 +107,7 @@ enum Command { #[clap(long)] machine_id: Option, #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0) )] bind: SocketAddr, #[clap(long)] diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index ed771e961..75bf629cf 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -75,6 +75,7 @@ pub async fn spawn_listener_loop( .local_addr() .wrap_err("failed to get local addr of socket")?; + tracing::debug!("inter-daemon listener starting for machine `{machine_id}` on {socket_addr}"); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 654509a9e..3118795b4 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -35,7 +35,7 @@ async fn main() -> eyre::Result<()> { let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); let coordinator_bind = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DORA_COORDINATOR_PORT_DEFAULT, ); let (coordinator_addr, coordinator) = From e76f497939c66e937ddc820674455f91e8ece2af Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 17 Apr 2024 14:58:51 +0200 Subject: [PATCH 07/11] Revert "revert to daemons listening to 127.0.0.1 by default" This reverts commit 4af82a4833bea5f277ab7c6f392ddd03a4b9380f. --- binaries/cli/src/main.rs | 2 +- binaries/daemon/src/inter_daemon.rs | 1 - examples/multiple-daemons/run.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 93db5eb23..f02482fd6 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -107,7 +107,7 @@ enum Command { #[clap(long)] machine_id: Option, #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0) + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) )] bind: SocketAddr, #[clap(long)] diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 75bf629cf..ed771e961 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -75,7 +75,6 @@ pub async fn spawn_listener_loop( .local_addr() .wrap_err("failed to get local addr of socket")?; - tracing::debug!("inter-daemon listener starting for machine `{machine_id}` on {socket_addr}"); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 3118795b4..654509a9e 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -35,7 +35,7 @@ async fn main() -> eyre::Result<()> { let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); let coordinator_bind = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); let (coordinator_addr, coordinator) = From 6115ddbb4dd93d5eb7b50cca399e57d996ef8372 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 17 Apr 2024 14:59:05 +0200 Subject: [PATCH 08/11] Revert "update dora_coordinator::start to return bound addr instead of just the port" This reverts commit 1ba763cd6782acd37a9de07eae135cc7d8b6ea2b. --- binaries/coordinator/src/lib.rs | 9 +++++---- examples/multiple-daemons/run.rs | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index eb6f8efcd..c7c303d14 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -38,11 +38,12 @@ mod tcp_utils; pub async fn start( bind: SocketAddr, external_events: impl Stream + Unpin, -) -> Result<(SocketAddr, impl Future>), eyre::ErrReport> { +) -> Result<(u16, impl Future>), eyre::ErrReport> { let listener = listener::create_listener(bind).await?; - let bound_addr = listener + let port = listener .local_addr() - .wrap_err("failed to get local addr of listener")?; + .wrap_err("failed to get local addr of listener")? + .port(); let mut tasks = FuturesUnordered::new(); // Setup ctrl-c handler @@ -60,7 +61,7 @@ pub async fn start( tracing::debug!("all spawned tasks finished, exiting.."); Ok(()) }; - Ok((bound_addr, future)) + Ok((port, future)) } // Resolve the dataflow name. diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 654509a9e..95f64de70 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -38,9 +38,10 @@ async fn main() -> eyre::Result<()> { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); - let (coordinator_addr, coordinator) = + let (coordinator_port, coordinator) = dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) .await?; + let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); From 01707fc14debcafde0ef86ea03720bd25fe511c8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 17 Apr 2024 15:02:49 +0200 Subject: [PATCH 09/11] Fix: Use `peer_addr` of incoming daemon register request for listen socket We are not interested in the local bind address of the daemon. Instead, we want to use the IP address under which the daemon is available from other machines. This should also avoids the issue that connecting to 0.0.0.0 is not possible on Windows (we want to use 0.0.0.0 as default bind address). --- binaries/coordinator/src/lib.rs | 37 ++++++++++++++-------- binaries/coordinator/src/listener.rs | 4 +-- binaries/daemon/src/coordinator.rs | 4 +-- binaries/daemon/src/inter_daemon.rs | 9 +++--- binaries/daemon/src/lib.rs | 4 +-- libraries/core/src/coordinator_messages.rs | 3 +- 6 files changed, 35 insertions(+), 26 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7c303d14..d9b77fa99 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -171,29 +171,38 @@ async fn start_inner( machine_id, mut connection, dora_version: daemon_version, - listen_socket, + listen_port, } => { - let coordinator_version = &env!("CARGO_PKG_VERSION"); - let reply = if &daemon_version == coordinator_version { - RegisterResult::Ok + let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); + let version_check = if &daemon_version == coordinator_version { + Ok(()) } else { - RegisterResult::Err(format!( + Err(format!( "version mismatch: daemon v{daemon_version} is \ - not compatible with coordinator v{coordinator_version}" + not compatible with coordinator v{coordinator_version}" )) }; - let reply = Timestamped { - inner: reply, + let peer_ip = connection + .peer_addr() + .map(|addr| addr.ip()) + .map_err(|err| format!("failed to get peer addr of connection: {err}")); + let register_result = version_check.and(peer_ip); + + let reply: Timestamped = Timestamped { + inner: match ®ister_result { + Ok(_) => RegisterResult::Ok, + Err(err) => RegisterResult::Err(err.clone()), + }, timestamp: clock.new_timestamp(), }; let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await; - match (reply.inner, send_result) { - (RegisterResult::Ok, Ok(())) => { + match (register_result, send_result) { + (Ok(ip), Ok(())) => { let previous = daemon_connections.insert( machine_id.clone(), DaemonConnection { stream: connection, - listen_socket, + listen_socket: (ip, listen_port).into(), last_heartbeat: Instant::now(), }, ); @@ -203,10 +212,10 @@ async fn start_inner( ); } } - (RegisterResult::Err(err), _) => { + (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); } - (RegisterResult::Ok, Err(err)) => { + (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); } } @@ -907,7 +916,7 @@ pub enum DaemonEvent { dora_version: String, machine_id: String, connection: TcpStream, - listen_socket: SocketAddr, + listen_port: u16, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 824d1168c..86600a4be 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -52,13 +52,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Register { machine_id, dora_version, - listen_socket, + listen_port, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, - listen_socket, + listen_port, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 27ae8e49d..d2f86b3cc 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -24,7 +24,7 @@ pub struct CoordinatorEvent { pub async fn register( addr: SocketAddr, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, clock: &HLC, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) @@ -37,7 +37,7 @@ pub async fn register( inner: CoordinatorRequest::Register { dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, - listen_socket, + listen_port, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index ed771e961..7eb4b9485 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -64,23 +64,24 @@ pub async fn spawn_listener_loop( bind: SocketAddr, machine_id: String, events_tx: flume::Sender>, -) -> eyre::Result { +) -> eyre::Result { let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) } }; - let socket_addr = socket + let listen_port = socket .local_addr() - .wrap_err("failed to get local addr of socket")?; + .wrap_err("failed to get local addr of socket")? + .port(); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); }); - Ok(socket_addr) + Ok(listen_port) } async fn listener_loop( diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 857ecd17e..59addea18 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -88,7 +88,7 @@ impl Daemon { // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); - let listen_socket = + let listen_port = inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), @@ -97,7 +97,7 @@ impl Daemon { // connect to the coordinator let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock) + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) .await .wrap_err("failed to connect to dora-coordinator")? .map( diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index e83ee784e..38e9eae2e 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,13 +1,12 @@ use crate::daemon_messages::DataflowId; use eyre::eyre; -use std::net::SocketAddr; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { Register { dora_version: String, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, }, Event { machine_id: String, From a9bfafd7fc7348861291da44a9c5ea786deda8ea Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 17 Apr 2024 10:20:14 -0500 Subject: [PATCH 10/11] refactor: use `addr` instead of `bind` to configure daemon listen address --- binaries/cli/src/main.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f02482fd6..6af248586 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -106,10 +106,11 @@ enum Command { Daemon { #[clap(long)] machine_id: Option, + /// The IP address and port this daemon will bind to. #[clap(long, default_value_t = SocketAddr::new( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) )] - bind: SocketAddr, + addr: SocketAddr, #[clap(long)] coordinator_addr: Option, @@ -292,7 +293,7 @@ fn run() -> eyre::Result<()> { } Command::Daemon { coordinator_addr, - bind, + addr, machine_id, run_dataflow, } => { @@ -314,12 +315,12 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - let addr = coordinator_addr.unwrap_or_else(|| { + let coordination_addr = coordinator_addr.unwrap_or_else(|| { tracing::info!("Starting in local mode"); let localhost = Ipv4Addr::new(127, 0, 0, 1); (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() }); - Daemon::run(addr, machine_id.unwrap_or_default(), bind).await + Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await } } }) From 431d72ce4cc546c5df981119eb04487071fd3584 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Wed, 17 Apr 2024 10:27:52 -0500 Subject: [PATCH 11/11] refactor: rename coordinator listen port configuration to addr --- binaries/cli/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 6af248586..6cef5bced 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -124,7 +124,7 @@ enum Command { #[clap(long, default_value_t = SocketAddr::new( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) )] - bind: SocketAddr, + addr: SocketAddr, }, } @@ -279,14 +279,14 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - Command::Coordinator { bind } => { + Command::Coordinator { addr } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { let (_port, task) = - dora_coordinator::start(bind, futures::stream::empty::()).await?; + dora_coordinator::start(addr, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")?