Skip to content

Commit

Permalink
feat: add bind configuration for coordination daemon
Browse files Browse the repository at this point in the history
Ref: #459
  • Loading branch information
Michael-J-Ward committed Apr 16, 2024
1 parent f659022 commit 3a6323f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
11 changes: 8 additions & 3 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ enum Command {
/// Run runtime
Runtime,
/// Run coordinator
Coordinator { port: Option<u16> },
Coordinator {
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT)
)]
bind: SocketAddr,
},
}

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -273,14 +278,14 @@ fn run() -> eyre::Result<()> {
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Command::Coordinator { port } => {
Command::Coordinator { bind } => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(port, futures::stream::empty::<Event>()).await?;
dora_coordinator::start(bind, futures::stream::empty::<Event>()).await?;
task.await
})
.context("failed to run dora-coordinator")?
Expand Down
10 changes: 3 additions & 7 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
Expand All @@ -39,11 +36,10 @@ mod run;
mod tcp_utils;

pub async fn start(
port: Option<u16>,
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
let listener = listener::create_listener(bind).await?;
let port = listener
.local_addr()
.wrap_err("failed to get local addr of listener")?
Expand Down
7 changes: 3 additions & 4 deletions binaries/coordinator/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc};
use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};

pub async fn create_listener(port: u16) -> eyre::Result<TcpListener> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, port)).await {
pub async fn create_listener(bind: SocketAddr) -> eyre::Result<TcpListener> {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
Expand Down
11 changes: 8 additions & 3 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowId},
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};

use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
time::Duration,
};
Expand All @@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> {
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_port, coordinator) =
dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?;
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into());
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());
Expand Down

0 comments on commit 3a6323f

Please sign in to comment.