Skip to content

Commit

Permalink
feat: add bind configuration for the inter_daemon listener
Browse files Browse the repository at this point in the history
Ref: #459
  • Loading branch information
Michael-J-Ward committed Apr 12, 2024
1 parent 7f22e2d commit 0a4ccd1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
9 changes: 7 additions & 2 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::Ipv4Addr, path::PathBuf};
use std::{net::{IpAddr, Ipv4Addr}, path::PathBuf};

use attach::attach_dataflow;
use clap::Parser;
Expand Down Expand Up @@ -103,6 +103,10 @@ enum Command {
Daemon {
#[clap(long)]
machine_id: Option<String>,
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
bind: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

Expand Down Expand Up @@ -280,6 +284,7 @@ fn run() -> eyre::Result<()> {
}
Command::Daemon {
coordinator_addr,
bind,
machine_id,
run_dataflow,
} => {
Expand All @@ -306,7 +311,7 @@ fn run() -> eyre::Result<()> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(addr, machine_id.unwrap_or_default()).await
Daemon::run(addr, machine_id.unwrap_or_default(), bind).await
}
}
})
Expand Down
6 changes: 3 additions & 3 deletions binaries/daemon/src/inter_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use eyre::{Context, ContextCompat};
use std::{
collections::BTreeMap,
io::ErrorKind,
net::{Ipv4Addr, SocketAddr},
net::SocketAddr,
};
use tokio::net::{TcpListener, TcpStream};

Expand Down Expand Up @@ -65,11 +65,11 @@ pub async fn send_inter_daemon_event(
}

pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
Expand Down
4 changes: 2 additions & 2 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ pub struct Daemon {
}

impl Daemon {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String, bind_addr: SocketAddr) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?;
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
Expand Down

0 comments on commit 0a4ccd1

Please sign in to comment.