diff --git a/hydro_deploy/core/src/deployment.rs b/hydro_deploy/core/src/deployment.rs index 08daf28978d4..cb5c6aa04fc2 100644 --- a/hydro_deploy/core/src/deployment.rs +++ b/hydro_deploy/core/src/deployment.rs @@ -60,7 +60,7 @@ impl Deployment { pub async fn deploy(&mut self) -> Result<()> { self.services.retain(|weak| weak.strong_count() > 0); - progress::ProgressTracker::with_group("deploy", None, || async { + progress::ProgressTracker::with_group("deploy", Some(3), || async { let mut resource_batch = super::ResourceBatch::new(); for service in self.services.iter().filter_map(Weak::upgrade) { @@ -72,7 +72,7 @@ impl Deployment { } let resource_result = Arc::new( - progress::ProgressTracker::with_group("provision", None, || async { + progress::ProgressTracker::with_group("provision", Some(1), || async { resource_batch .provision(&mut self.resource_pool, self.last_resource_result.clone()) .await @@ -85,12 +85,16 @@ impl Deployment { host.provision(&resource_result); } - progress::ProgressTracker::with_group("deploy", None, || { - let services_future = self - .services + let upgraded_services = self + .services + .iter() + .filter_map(Weak::upgrade) + .collect::>(); + + progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || { + let services_future = upgraded_services .iter() - .filter_map(Weak::upgrade) - .map(|service: Arc>| { + .map(|service: &Arc>| { let resource_result = &resource_result; async move { service.write().await.deploy(resource_result).await } }) @@ -102,13 +106,14 @@ impl Deployment { }) .await?; - progress::ProgressTracker::with_group("ready", None, || { - let all_services_ready = self.services.iter().filter_map(Weak::upgrade).map( - |service: Arc>| async move { - service.write().await.ready().await?; - Ok(()) as Result<()> - }, - ); + progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || { + let all_services_ready = + upgraded_services + .iter() + .map(|service: &Arc>| async move { + service.write().await.ready().await?; + Ok(()) as Result<()> + }); futures::future::try_join_all(all_services_ready) }) diff --git a/hydro_deploy/core/src/hydroflow_crate/build.rs b/hydro_deploy/core/src/hydroflow_crate/build.rs index 12bc3424c485..72e20f367220 100644 --- a/hydro_deploy/core/src/hydroflow_crate/build.rs +++ b/hydro_deploy/core/src/hydroflow_crate/build.rs @@ -87,7 +87,7 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO .get_or_init(MemoMap::new) .get_or_insert(¶ms, Default::default) .get_or_try_init(move || { - ProgressTracker::rich_leaf("build".to_string(), move |_, set_msg| async move { + ProgressTracker::rich_leaf("build", move |set_msg| async move { tokio::task::spawn_blocking(move || { let mut command = Command::new("cargo"); command.args(["build"]); diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 372a1cddcdd8..f66bc89396a4 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -197,13 +197,12 @@ impl Service for HydroflowCrateService { } ProgressTracker::with_group( - &self - .display_id + self.display_id .clone() .unwrap_or_else(|| format!("service/{}", self.id)), None, || async { - let built = self.build().await?; + let built = ProgressTracker::leaf("build", self.build()).await?; let host = &self.on; let launched = host.provision(resource_result); @@ -223,8 +222,7 @@ impl Service for HydroflowCrateService { } ProgressTracker::with_group( - &self - .display_id + self.display_id .clone() .unwrap_or_else(|| format!("service/{}", self.id)), None, @@ -259,7 +257,7 @@ impl Service for HydroflowCrateService { binary.stdin().send(format!("{formatted_bind_config}\n"))?; let ready_line = ProgressTracker::leaf( - "waiting for ready".to_string(), + "waiting for ready", tokio::time::timeout(Duration::from_secs(60), stdout_receiver), ) .await??; @@ -317,8 +315,7 @@ impl Service for HydroflowCrateService { async fn stop(&mut self) -> Result<()> { ProgressTracker::with_group( - &self - .display_id + self.display_id .clone() .unwrap_or_else(|| format!("service/{}", self.id)), None, @@ -327,7 +324,7 @@ impl Service for HydroflowCrateService { launched_binary.stdin().send("stop\n".to_string())?; let timeout_result = ProgressTracker::leaf( - "waiting for exit".to_owned(), + "waiting for exit", tokio::time::timeout(Duration::from_secs(60), launched_binary.wait()), ) .await; diff --git a/hydro_deploy/core/src/progress.rs b/hydro_deploy/core/src/progress.rs index 1faa918d9565..e4f8bdd6b4f6 100644 --- a/hydro_deploy/core/src/progress.rs +++ b/hydro_deploy/core/src/progress.rs @@ -64,9 +64,6 @@ impl BarTree { } } BarTree::Group(name, pb, children, anticipated_total) => { - let mut path_with_group = cur_path.to_vec(); - path_with_group.push(name.clone()); - let finished_count = children .iter() .filter(|child| child.status() == LeafStatus::Finished) @@ -78,23 +75,46 @@ impl BarTree { let queued_count = anticipated_total.map(|total| total - finished_count - started_count); - match queued_count { - Some(queued_count) => { - pb.set_prefix(format!( - "{} ({}/{}/{})", - path_with_group.join(" / "), - finished_count, - started_count, - queued_count - )); - } - None => pb.set_prefix(format!( - "{} ({}/{})", - path_with_group.join(" / "), - finished_count, - started_count - )), + let progress_str = + if anticipated_total.iter().any(|v| *v == 1) && started_count == 1 { + "".to_string() + } else { + match queued_count { + Some(queued_count) => { + format!( + " ({}/{}/{})", + finished_count, + started_count, + queued_count + finished_count + started_count + ) + } + None => format!(" ({}/{}/?)", finished_count, started_count), + } + }; + + if cur_path.is_empty() { + pb.set_prefix(format!("{}{}", name, progress_str)); + } else { + pb.set_prefix(format!( + "{} / {}{}", + cur_path.join(" / "), + name, + progress_str, + )); } + + let mut path_with_group = cur_path.to_vec(); + let non_finished_count = children + .iter() + .filter(|child| child.status() != LeafStatus::Finished) + .count(); + + if non_finished_count == 1 { + path_with_group.push(format!("{}{}", name, progress_str)); + } else { + path_with_group.push(name.clone()); + } + for child in children { child.refresh_prefix(&path_with_group); } @@ -108,7 +128,7 @@ impl BarTree { } } - fn find_node(&mut self, path: &[usize]) -> &mut BarTree { + fn find_node(&self, path: &[usize]) -> &BarTree { if path.is_empty() { return self; } @@ -120,12 +140,26 @@ impl BarTree { _ => panic!(), } } + + fn find_node_mut(&mut self, path: &[usize]) -> &mut BarTree { + if path.is_empty() { + return self; + } + + match self { + BarTree::Root(children) | BarTree::Group(_, _, children, _) => { + children[path[0]].find_node_mut(&path[1..]) + } + _ => panic!(), + } + } } pub struct ProgressTracker { pub(crate) multi_progress: MultiProgress, tree: BarTree, pub(crate) current_count: usize, + progress_list: Vec<(Arc, bool)>, } impl ProgressTracker { @@ -134,6 +168,7 @@ impl ProgressTracker { multi_progress: MultiProgress::new(), tree: BarTree::Root(vec![]), current_count: 0, + progress_list: vec![], } } @@ -152,6 +187,49 @@ impl ProgressTracker { _ => panic!(), }; + if let Some(surrounding_pb) = &surrounding_pb { + let non_finished_count = surrounding_children + .iter() + .filter(|child| child.status() != LeafStatus::Finished) + .count(); + if non_finished_count == 0 { + self.multi_progress.remove(surrounding_pb.as_ref()); + let surrounding_idx = self + .progress_list + .iter() + .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb)) + .unwrap(); + self.progress_list[surrounding_idx].1 = false; + } else if non_finished_count == 1 { + let self_idx = self + .progress_list + .iter() + .position(|(pb, _)| Arc::ptr_eq(pb, surrounding_pb)) + .unwrap(); + let last_visible_before = self.progress_list[..self_idx] + .iter() + .rposition(|(_, visible)| *visible); + if let Some(last_visible_before) = last_visible_before { + self.multi_progress.insert_after( + &self.progress_list[last_visible_before].0, + surrounding_pb.as_ref().clone(), + ); + } else { + self.multi_progress + .insert(0, surrounding_pb.as_ref().clone()); + } + + self.progress_list[self_idx].1 = true; + } + } + + let surrounding = self.tree.find_node_mut(&under_path); + let (surrounding_children, surrounding_pb) = match surrounding { + BarTree::Root(children) => (children, None), + BarTree::Group(_, pb, children, _) => (children, Some(pb)), + _ => panic!(), + }; + self.current_count += 1; let core_bar = indicatif::ProgressBar::new(100); @@ -160,15 +238,45 @@ impl ProgressTracker { .rev() .flat_map(|c| c.get_pb()) .next(); - let created_bar = if let Some(previous_bar) = previous_bar { - self.multi_progress.insert_after(previous_bar, core_bar) + + let index_to_insert = if let Some(previous_bar) = previous_bar { + let index_of_prev = self + .progress_list + .iter() + .position(|pb| Arc::ptr_eq(&pb.0, previous_bar)) + .unwrap(); + index_of_prev + 1 } else if let Some(group_pb) = surrounding_pb { - self.multi_progress.insert_after(group_pb, core_bar) + let index_of_group = self + .progress_list + .iter() + .position(|pb| Arc::ptr_eq(&pb.0, group_pb)) + .unwrap(); + index_of_group + 1 + } else if !self.progress_list.is_empty() { + self.progress_list.len() + } else { + 0 + }; + + let last_visible = if !self.progress_list.is_empty() { + self.progress_list[..index_to_insert] + .iter() + .rposition(|(_, visible)| *visible) + } else { + None + }; + + let created_bar = if let Some(last_visible) = last_visible { + self.multi_progress + .insert_after(&self.progress_list[last_visible].0, core_bar) } else { - self.multi_progress.add(core_bar) + self.multi_progress.insert(0, core_bar) }; let pb = Arc::new(created_bar); + self.progress_list + .insert(index_to_insert, (pb.clone(), true)); if group { surrounding_children.push(BarTree::Group(name, pb.clone(), vec![], anticipated_total)); } else { @@ -197,11 +305,50 @@ impl ProgressTracker { } pub fn end_task(&mut self, path: Vec) { - match self.tree.find_node(&path[0..path.len() - 1]) { + let parent = self.tree.find_node_mut(&path[0..path.len() - 1]); + match parent { BarTree::Root(children) | BarTree::Group(_, _, children, _) => { let removed = children[*path.last().unwrap()].get_pb().unwrap().clone(); children[*path.last().unwrap()] = BarTree::Finished; self.multi_progress.remove(&removed); + self.progress_list + .retain(|(pb, _)| !Arc::ptr_eq(pb, &removed)); + + let non_finished_count = children + .iter() + .filter(|child| child.status() != LeafStatus::Finished) + .count(); + if let BarTree::Group(_, pb, _, _) = parent { + if non_finished_count == 1 { + self.multi_progress.remove(pb.as_ref()); + self.progress_list + .iter_mut() + .find(|(pb2, _)| Arc::ptr_eq(pb2, pb)) + .unwrap() + .1 = false; + } else if non_finished_count == 0 { + let self_idx = self + .progress_list + .iter() + .position(|(pb2, _)| Arc::ptr_eq(pb2, pb)) + .unwrap(); + + let last_visible_before = self.progress_list[..self_idx] + .iter() + .rposition(|(_, visible)| *visible); + + if let Some(last_visible_before) = last_visible_before { + self.multi_progress.insert_after( + &self.progress_list[last_visible_before].0, + pb.as_ref().clone(), + ); + } else { + self.multi_progress.insert(0, pb.as_ref().clone()); + } + + self.progress_list[self_idx].1 = true; + } + } } _ => panic!(), @@ -229,7 +376,7 @@ impl ProgressTracker { } pub fn with_group<'a, T, F: Future>( - name: &str, + name: impl Into, anticipated_total: Option, f: impl FnOnce() -> F + 'a, ) -> impl Future + 'a { @@ -242,13 +389,7 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task( - group.clone(), - name.to_string(), - true, - anticipated_total, - false, - ) + progress_bar.start_task(group.clone(), name.into(), true, anticipated_total, false) }; group.push(group_i); @@ -264,7 +405,10 @@ impl ProgressTracker { }) } - pub fn leaf>(name: String, f: F) -> impl Future { + pub fn leaf>( + name: impl Into, + f: F, + ) -> impl Future { let mut group = CURRENT_GROUP .try_with(|cur| cur.clone()) .unwrap_or_default(); @@ -274,7 +418,7 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task(group.clone(), name, false, None, false) + progress_bar.start_task(group.clone(), name.into(), false, None, false) }; group.push(leaf_i); @@ -291,7 +435,40 @@ impl ProgressTracker { } pub fn rich_leaf<'a, T, F: Future>( - name: String, + name: impl Into, + f: impl FnOnce(Box) -> F + 'a, + ) -> impl Future + 'a { + let mut group = CURRENT_GROUP + .try_with(|cur| cur.clone()) + .unwrap_or_default(); + + let (leaf_i, bar) = { + let mut progress_bar = PROGRESS_TRACKER + .get_or_init(|| Mutex::new(ProgressTracker::new())) + .lock() + .unwrap(); + progress_bar.start_task(group.clone(), name.into(), false, None, false) + }; + + group.push(leaf_i); + + async move { + let my_bar = bar.clone(); + let out = f(Box::new(move |msg| { + my_bar.set_message(msg); + })) + .await; + let mut progress_bar = PROGRESS_TRACKER + .get_or_init(|| Mutex::new(ProgressTracker::new())) + .lock() + .unwrap(); + progress_bar.end_task(group); + out + } + } + + pub fn progress_leaf<'a, T, F: Future>( + name: impl Into, f: impl FnOnce(Box, Box) -> F + 'a, ) -> impl Future + 'a { let mut group = CURRENT_GROUP @@ -303,7 +480,7 @@ impl ProgressTracker { .get_or_init(|| Mutex::new(ProgressTracker::new())) .lock() .unwrap(); - progress_bar.start_task(group.clone(), name, false, None, true) + progress_bar.start_task(group.clone(), name.into(), false, None, true) }; group.push(leaf_i); diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index 26eb7bbe7808..355cf76d7ef7 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -96,7 +96,7 @@ impl LaunchedBinary for LaunchedSshBinary { async fn stop(&mut self) -> Result<()> { if !self.channel.eof() { - ProgressTracker::leaf("force stopping".to_owned(), async { + ProgressTracker::leaf("force stopping", async { self.channel.write_all(b"\x03").await?; // `^C` self.channel.send_eof().await?; self.channel.wait_eof().await?; @@ -112,7 +112,7 @@ impl LaunchedBinary for LaunchedSshBinary { let mut script_channel = self.session.as_ref().unwrap().channel_session().await?; let mut fold_er = Folder::from(perf.fold_options.clone().unwrap_or_default()); - let fold_data = ProgressTracker::leaf("perf script & folding".to_owned(), async move { + let fold_data = ProgressTracker::leaf("perf script & folding", async move { let mut stderr_lines = FuturesBufReader::new(script_channel.stderr()).lines(); let stdout = script_channel.stream(0); @@ -358,7 +358,7 @@ impl LaunchedHost for T { let temp_path = PathBuf::from(format!("/home/{user}/hydro-{random}")); let sftp = &sftp; - ProgressTracker::rich_leaf( + ProgressTracker::progress_leaf( format!("uploading binary to {}", binary_path.display()), |set_progress, _| { let binary = &binary; diff --git a/hydro_deploy/core/src/terraform.rs b/hydro_deploy/core/src/terraform.rs index f812c9fc4c93..73be99d697b0 100644 --- a/hydro_deploy/core/src/terraform.rs +++ b/hydro_deploy/core/src/terraform.rs @@ -119,7 +119,7 @@ impl TerraformBatch { }); } - ProgressTracker::with_group("terraform", None, || async { + ProgressTracker::with_group("terraform", Some(1), || async { let dothydro_folder = std::env::current_dir().unwrap().join(".hydro"); std::fs::create_dir_all(&dothydro_folder).unwrap(); let deployment_folder = tempfile::tempdir_in(dothydro_folder).unwrap();