Skip to content

Commit

Permalink
Adding long arg for grace duration and adding more grace duration wit…
Browse files Browse the repository at this point in the history
…hin the CI
  • Loading branch information
haixuanTao committed Apr 22, 2024
1 parent 06f0bfc commit 20afeb7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ jobs:
cargo build --all
dora start dataflow.yml --name ci-rust-test
sleep 10
dora stop --name ci-rust-test
dora stop --name ci-rust-test --grace-duration 5s
cd ..
# Test Python template Project
Expand All @@ -282,7 +282,7 @@ jobs:
cd test_python_project
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test
dora stop --name ci-python-test --grace-duration 5s
cd ..
dora destroy
Expand Down
1 change: 1 addition & 0 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum Command {
uuid: Option<Uuid>,
#[clap(long)]
name: Option<String>,
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
},
Expand Down
25 changes: 17 additions & 8 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Daemon {
}
Event::CtrlC => {
for dataflow in self.running.values_mut() {
dataflow.stop_all(&self.clock, None).await;
dataflow.stop_all(&self.clock, None, None).await;
}
}
}
Expand Down Expand Up @@ -438,11 +438,9 @@ impl Daemon {
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
// .stop_all(&self.clock.clone(), grace_duration);
dataflow.stop_all(&self.clock, grace_duration).await;
let reply = DaemonCoordinatorReply::StopResult(Ok(()));
let _ = reply_tx
.send(Some(reply))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));
dataflow
.stop_all(&self.clock, grace_duration, Some(reply_tx))
.await;
RunStatus::Continue
}
DaemonCoordinatorEvent::Destroy => {
Expand Down Expand Up @@ -1429,7 +1427,12 @@ impl RunningDataflow {
Ok(())
}

async fn stop_all(&mut self, clock: &HLC, grace_duration: Option<Duration>) {
async fn stop_all(
&mut self,
clock: &HLC,
grace_duration: Option<Duration>,
reply_tx: Option<Sender<Option<DaemonCoordinatorReply>>>,
) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock);
}
Expand All @@ -1450,8 +1453,14 @@ impl RunningDataflow {
)
}
}
});

let reply = DaemonCoordinatorReply::StopResult(Ok(()));
if let Some(tx) = reply_tx {
let _ = tx
.send(Some(reply))
.map_err(|_| error!("could not send stop reply from daemon to coordinator"));
}
});
self.stop_sent = true;
}

Expand Down

0 comments on commit 20afeb7

Please sign in to comment.