diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index e6b5f757b..6cef5bced 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,4 +1,7 @@ -use std::{net::Ipv4Addr, path::PathBuf}; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, +}; use attach::attach_dataflow; use clap::Parser; @@ -103,6 +106,11 @@ enum Command { Daemon { #[clap(long)] machine_id: Option, + /// The IP address and port this daemon will bind to. + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) + )] + addr: SocketAddr, #[clap(long)] coordinator_addr: Option, @@ -112,7 +120,12 @@ enum Command { /// Run runtime Runtime, /// Run coordinator - Coordinator { port: Option }, + Coordinator { + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) + )] + addr: SocketAddr, + }, } #[derive(Debug, clap::Args)] @@ -266,20 +279,21 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - Command::Coordinator { port } => { + Command::Coordinator { addr } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { let (_port, task) = - dora_coordinator::start(port, futures::stream::empty::()).await?; + dora_coordinator::start(addr, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")? } Command::Daemon { coordinator_addr, + addr, machine_id, run_dataflow, } => { @@ -301,12 +315,12 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - let addr = coordinator_addr.unwrap_or_else(|| { + let coordination_addr = coordinator_addr.unwrap_or_else(|| { tracing::info!("Starting in local mode"); let localhost = Ipv4Addr::new(127, 0, 0, 1); (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() }); - Daemon::run(addr, machine_id.unwrap_or_default()).await + Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await } } }) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3d6be47d7..d9b77fa99 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,10 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_DEFAULT, - }, + topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -39,11 +36,10 @@ mod run; mod tcp_utils; pub async fn start( - port: Option, + bind: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { - let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); - let listener = listener::create_listener(port).await?; + let listener = listener::create_listener(bind).await?; let port = listener .local_addr() .wrap_err("failed to get local addr of listener")? @@ -175,29 +171,38 @@ async fn start_inner( machine_id, mut connection, dora_version: daemon_version, - listen_socket, + listen_port, } => { - let coordinator_version = &env!("CARGO_PKG_VERSION"); - let reply = if &daemon_version == coordinator_version { - RegisterResult::Ok + let coordinator_version: &&str = &env!("CARGO_PKG_VERSION"); + let version_check = if &daemon_version == coordinator_version { + Ok(()) } else { - RegisterResult::Err(format!( + Err(format!( "version mismatch: daemon v{daemon_version} is \ - not compatible with coordinator v{coordinator_version}" + not compatible with coordinator v{coordinator_version}" )) }; - let reply = Timestamped { - inner: reply, + let peer_ip = connection + .peer_addr() + .map(|addr| addr.ip()) + .map_err(|err| format!("failed to get peer addr of connection: {err}")); + let register_result = version_check.and(peer_ip); + + let reply: Timestamped = Timestamped { + inner: match ®ister_result { + Ok(_) => RegisterResult::Ok, + Err(err) => RegisterResult::Err(err.clone()), + }, timestamp: clock.new_timestamp(), }; let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await; - match (reply.inner, send_result) { - (RegisterResult::Ok, Ok(())) => { + match (register_result, send_result) { + (Ok(ip), Ok(())) => { let previous = daemon_connections.insert( machine_id.clone(), DaemonConnection { stream: connection, - listen_socket, + listen_socket: (ip, listen_port).into(), last_heartbeat: Instant::now(), }, ); @@ -207,10 +212,10 @@ async fn start_inner( ); } } - (RegisterResult::Err(err), _) => { + (Err(err), _) => { tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}"); } - (RegisterResult::Ok, Err(err)) => { + (Ok(_), Err(err)) => { tracing::warn!("failed to confirm daemon connection for machine `{machine_id}`: {err}"); } } @@ -481,6 +486,12 @@ async fn start_inner( let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { + tracing::warn!( + "no heartbeat message from machine `{machine_id}` since {:?}", + connection.last_heartbeat.elapsed() + ) + } + if connection.last_heartbeat.elapsed() > Duration::from_secs(30) { disconnected.insert(machine_id.clone()); continue; } @@ -500,7 +511,7 @@ async fn start_inner( } } if !disconnected.is_empty() { - tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}"); + tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}"); for machine_id in disconnected { daemon_connections.remove(&machine_id); } @@ -905,7 +916,7 @@ pub enum DaemonEvent { dora_version: String, machine_id: String, connection: TcpStream, - listen_socket: SocketAddr, + listen_port: u16, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index ee78755fd..86600a4be 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,15 +1,14 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc}; +use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, }; -pub async fn create_listener(port: u16) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, port)).await { +pub async fn create_listener(bind: SocketAddr) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) @@ -53,13 +52,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Register { machine_id, dora_version, - listen_socket, + listen_port, } => { let event = DaemonEvent::Register { dora_version, machine_id, connection, - listen_socket, + listen_port, }; let _ = events_tx.send(Event::Daemon(event)).await; break; diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 27ae8e49d..d2f86b3cc 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -24,7 +24,7 @@ pub struct CoordinatorEvent { pub async fn register( addr: SocketAddr, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, clock: &HLC, ) -> eyre::Result>> { let mut stream = TcpStream::connect(addr) @@ -37,7 +37,7 @@ pub async fn register( inner: CoordinatorRequest::Register { dora_version: env!("CARGO_PKG_VERSION").to_owned(), machine_id, - listen_socket, + listen_port, }, timestamp: clock.new_timestamp(), })?; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 40a8a34b3..7eb4b9485 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -1,11 +1,7 @@ use crate::tcp_utils::{tcp_receive, tcp_send}; use dora_core::daemon_messages::{InterDaemonEvent, Timestamped}; use eyre::{Context, ContextCompat}; -use std::{ - collections::BTreeMap, - io::ErrorKind, - net::{Ipv4Addr, SocketAddr}, -}; +use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr}; use tokio::net::{TcpListener, TcpStream}; pub struct InterDaemonConnection { @@ -65,26 +61,27 @@ pub async fn send_inter_daemon_event( } pub async fn spawn_listener_loop( + bind: SocketAddr, machine_id: String, events_tx: flume::Sender>, -) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, 0)).await { +) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) } }; - let socket_addr = socket + let listen_port = socket .local_addr() - .wrap_err("failed to get local addr of socket")?; + .wrap_err("failed to get local addr of socket")? + .port(); tokio::spawn(async move { listener_loop(socket, events_tx).await; tracing::debug!("inter-daemon listener loop finished for machine `{machine_id}`"); }); - Ok(socket_addr) + Ok(listen_port) } async fn listener_loop( diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9865d0eb9..59addea18 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -77,15 +77,19 @@ pub struct Daemon { } impl Daemon { - pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> { + pub async fn run( + coordinator_addr: SocketAddr, + machine_id: String, + bind_addr: SocketAddr, + ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; // spawn listen loop let (events_tx, events_rx) = flume::bounded(10); - let listen_socket = - inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?; + let listen_port = + inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), timestamp: e.timestamp, @@ -93,7 +97,7 @@ impl Daemon { // connect to the coordinator let coordinator_events = - coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock) + coordinator::register(coordinator_addr, machine_id.clone(), listen_port, &clock) .await .wrap_err("failed to connect to dora-coordinator")? .map( diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index bd0722135..95f64de70 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,14 +1,14 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; use std::{ collections::BTreeSet, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, time::Duration, }; @@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> { build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); + let coordinator_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_DEFAULT, + ); let (coordinator_port, coordinator) = - dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; + dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index e83ee784e..38e9eae2e 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,13 +1,12 @@ use crate::daemon_messages::DataflowId; use eyre::eyre; -use std::net::SocketAddr; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { Register { dora_version: String, machine_id: String, - listen_socket: SocketAddr, + listen_port: u16, }, Event { machine_id: String,