diff --git a/crates/turborepo-lib/src/cli/mod.rs b/crates/turborepo-lib/src/cli/mod.rs index dbf3438c82dde..dd9dc72a7a1c7 100644 --- a/crates/turborepo-lib/src/cli/mod.rs +++ b/crates/turborepo-lib/src/cli/mod.rs @@ -1286,7 +1286,8 @@ pub async fn run( event.track_call(); let base = CommandBase::new(cli_args, repo_root, version, ui); - WatchClient::start(base, event).await?; + let mut client = WatchClient::new(base, event).await?; + client.start().await?; // We only exit if we get a signal, so we return a non-zero exit code return Ok(1); } diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 5d7cf52c9bb31..c8e6891a7192e 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -18,7 +18,7 @@ use semver::Version; use thiserror::Error; use tokio::{ select, - sync::{mpsc, oneshot}, + sync::{broadcast::error::RecvError, mpsc, oneshot}, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; @@ -594,7 +594,8 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { .package_changes_watcher .package_changes() .await; - let (tx, rx) = mpsc::channel(1); + + let (tx, rx) = mpsc::channel(1024); tx.send(Ok(proto::PackageChangeEvent { event: Some(proto::package_change_event::Event::RediscoverPackages( @@ -607,6 +608,14 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { tokio::spawn(async move { loop { let event = match package_changes_rx.recv().await { + Err(RecvError::Lagged(_)) => { + warn!("package changes stream lagged"); + proto::PackageChangeEvent { + event: Some(proto::package_change_event::Event::RediscoverPackages( + proto::RediscoverPackages {}, + )), + } + } Err(err) => proto::PackageChangeEvent { event: Some(proto::package_change_event::Event::Error( proto::PackageChangeError { diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 40f6539685263..d6e577d952504 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -142,11 +142,15 @@ impl Engine { /// Creates an instance of `Engine` that only contains tasks that depend on /// tasks from a given package. This is useful for watch mode, where we /// need to re-run only a portion of the task graph. - pub fn create_engine_for_subgraph(&self, changed_package: &PackageName) -> Engine { - let entrypoint_indices: &[petgraph::graph::NodeIndex] = self - .package_tasks - .get(changed_package) - .map_or(&[], |v| &v[..]); + pub fn create_engine_for_subgraph( + &self, + changed_packages: &HashSet, + ) -> Engine { + let entrypoint_indices: Vec<_> = changed_packages + .iter() + .flat_map(|pkg| self.package_tasks.get(pkg)) + .flatten() + .collect(); // We reverse the graph because we want the *dependents* of entrypoint tasks let mut reversed_graph = self.task_graph.clone(); @@ -175,7 +179,7 @@ impl Engine { .iter() .any(|idx| { node_distances - .get(&(*idx, node_idx)) + .get(&(**idx, node_idx)) .map_or(false, |dist| *dist != i32::MAX) }) .then_some(node.clone()) @@ -764,7 +768,8 @@ mod test { engine.task_graph.add_edge(b_build_idx, a_build_idx, ()); let engine = engine.seal(); - let subgraph = engine.create_engine_for_subgraph(&PackageName::from("a")); + let subgraph = + engine.create_engine_for_subgraph(&[PackageName::from("a")].into_iter().collect()); // Verify that the subgraph only contains tasks from package `a` and the `build` // task from package `b` diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index 1caff844e8fa5..2eb49f335dde8 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -62,7 +62,7 @@ pub struct RunBuilder { // In watch mode, we can have a changed package that we want to serve as an entrypoint. // We will then prune away any tasks that do not depend on tasks inside // this package. - entrypoint_package: Option, + entrypoint_packages: Option>, should_print_prelude_override: Option, } @@ -114,13 +114,13 @@ impl RunBuilder { version, experimental_ui, analytics_sender: None, - entrypoint_package: None, + entrypoint_packages: None, should_print_prelude_override: None, }) } - pub fn with_entrypoint_package(mut self, entrypoint_package: PackageName) -> Self { - self.entrypoint_package = Some(entrypoint_package); + pub fn with_entrypoint_packages(mut self, entrypoint_packages: HashSet) -> Self { + self.entrypoint_packages = Some(entrypoint_packages); self } @@ -451,8 +451,8 @@ impl RunBuilder { // If we have an initial task, we prune out the engine to only // tasks that are reachable from that initial task. - if let Some(entrypoint_package) = &self.entrypoint_package { - engine = engine.create_engine_for_subgraph(entrypoint_package); + if let Some(entrypoint_packages) = &self.entrypoint_packages { + engine = engine.create_engine_for_subgraph(entrypoint_packages); } if !self.opts.run_opts.parallel { diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index d172e39ff78ab..cf024ad53c253 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -1,9 +1,13 @@ -use std::collections::HashMap; +use std::collections::HashSet; use futures::StreamExt; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; -use tokio::{select, task::JoinHandle}; +use tokio::{ + select, + sync::watch, + task::{yield_now, JoinHandle}, +}; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; @@ -18,7 +22,35 @@ use crate::{ DaemonConnector, DaemonPaths, }; -pub struct WatchClient {} +#[derive(Clone)] +pub enum ChangedPackages { + All, + Some(HashSet), +} + +impl Default for ChangedPackages { + fn default() -> Self { + ChangedPackages::Some(HashSet::new()) + } +} + +impl ChangedPackages { + pub fn is_empty(&self) -> bool { + match self { + ChangedPackages::All => false, + ChangedPackages::Some(pkgs) => pkgs.is_empty(), + } + } +} + +pub struct WatchClient { + run: Run, + persistent_tasks_handle: Option>>, + connector: DaemonConnector, + base: CommandBase, + telemetry: CommandEventBuilder, + handler: SignalHandler, +} #[derive(Debug, Error, Diagnostic)] pub enum Error { @@ -51,20 +83,22 @@ pub enum Error { }, #[error("daemon connection closed")] ConnectionClosed, + #[error("failed to subscribe to signal handler, shutting down")] + NoSignalHandler, #[error("watch interrupted due to signal")] SignalInterrupt, #[error("package change error")] PackageChange(#[from] tonic::Status), + #[error("changed packages channel closed, cannot receive new changes")] + ChangedPackagesRecv(#[from] watch::error::RecvError), + #[error("changed packages channel closed, cannot send new changes")] + ChangedPackagesSend(#[from] watch::error::SendError), } impl WatchClient { - pub async fn start(base: CommandBase, telemetry: CommandEventBuilder) -> Result<(), Error> { + pub async fn new(base: CommandBase, telemetry: CommandEventBuilder) -> Result { let signal = commands::run::get_signal()?; let handler = SignalHandler::new(signal); - let Some(signal_subscriber) = handler.subscribe() else { - tracing::warn!("failed to subscribe to signal handler, shutting down"); - return Ok(()); - }; let Some(Command::Watch(execution_args)) = &base.args().command else { unreachable!() @@ -76,43 +110,56 @@ impl WatchClient { execution_args: execution_args.clone(), }); - let mut run = RunBuilder::new(new_base)? + let run = RunBuilder::new(new_base)? .build(&handler, telemetry.clone()) .await?; - run.print_run_prelude(); - let connector = DaemonConnector { can_start_server: true, can_kill_server: true, paths: DaemonPaths::from_repo_root(&base.repo_root), }; + Ok(Self { + base, + run, + connector, + handler, + telemetry, + persistent_tasks_handle: None, + }) + } + + pub async fn start(&mut self) -> Result<(), Error> { + let connector = self.connector.clone(); let mut client = connector.connect().await?; let mut events = client.package_changes().await?; - let mut current_runs: HashMap>> = - HashMap::new(); - let mut persistent_tasks_handle = None; + + self.run.print_run_prelude(); + + let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; + + let (changed_pkgs_tx, mut changed_pkgs_rx) = watch::channel(ChangedPackages::default()); let event_fut = async { while let Some(event) = events.next().await { let event = event?; - Self::handle_change_event( - &mut run, - event.event.unwrap(), - &mut current_runs, - &base, - &telemetry, - &handler, - &mut persistent_tasks_handle, - ) - .await?; + Self::handle_change_event(&changed_pkgs_tx, event.event.unwrap()).await?; } Err(Error::ConnectionClosed) }; + let run_fut = async { + loop { + changed_pkgs_rx.changed().await?; + let changed_pkgs = { changed_pkgs_rx.borrow_and_update().clone() }; + + self.execute_run(changed_pkgs).await?; + } + }; + select! { biased; _ = signal_subscriber.listen() => { @@ -123,17 +170,15 @@ impl WatchClient { result = event_fut => { result } + run_result = run_fut => { + run_result + } } } async fn handle_change_event( - run: &mut Run, + changed_packages_tx: &watch::Sender, event: proto::package_change_event::Event, - current_runs: &mut HashMap>>, - base: &CommandBase, - telemetry: &CommandEventBuilder, - handler: &SignalHandler, - persistent_tasks_handle: &mut Option>>, ) -> Result<(), Error> { // Should we recover here? match event { @@ -141,12 +186,40 @@ impl WatchClient { package_name, }) => { let package_name = PackageName::from(package_name); - // If not in the filtered pkgs, ignore - if !run.filtered_pkgs.contains(&package_name) { - return Ok(()); - } - let mut args = base.args().clone(); + changed_packages_tx.send_if_modified(|changed_pkgs| match changed_pkgs { + ChangedPackages::All => false, + ChangedPackages::Some(ref mut pkgs) => { + pkgs.insert(package_name); + + true + } + }); + } + proto::package_change_event::Event::RediscoverPackages(_) => { + changed_packages_tx.send(ChangedPackages::All)?; + } + proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { + return Err(DaemonError::Unavailable(message).into()); + } + } + + Ok(()) + } + + async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result { + // Should we recover here? + match changed_packages { + ChangedPackages::Some(packages) => { + let packages = packages + .into_iter() + .filter(|pkg| { + // If not in the filtered pkgs, ignore + self.run.filtered_pkgs.contains(pkg) + }) + .collect(); + + let mut args = self.base.args().clone(); args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { Command::Run { @@ -162,32 +235,26 @@ impl WatchClient { } }); - let new_base = - CommandBase::new(args, base.repo_root.clone(), get_version(), base.ui); - - // TODO: Add logic on when to abort vs wait - if let Some(run) = current_runs.remove(&package_name) { - run.abort(); - } + let new_base = CommandBase::new( + args, + self.base.repo_root.clone(), + get_version(), + self.base.ui, + ); - let signal_handler = handler.clone(); - let telemetry = telemetry.clone(); + let signal_handler = self.handler.clone(); + let telemetry = self.telemetry.clone(); - current_runs.insert( - package_name.clone(), - tokio::spawn(async move { - let mut run = RunBuilder::new(new_base)? - .with_entrypoint_package(package_name) - .hide_prelude() - .build(&signal_handler, telemetry) - .await?; + let mut run = RunBuilder::new(new_base)? + .with_entrypoint_packages(packages) + .hide_prelude() + .build(&signal_handler, telemetry) + .await?; - run.run().await - }), - ); + Ok(run.run().await?) } - proto::package_change_event::Event::RediscoverPackages(_) => { - let mut args = base.args().clone(); + ChangedPackages::All => { + let mut args = self.base.args().clone(); args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { Command::Run { @@ -203,43 +270,38 @@ impl WatchClient { } }); - let base = CommandBase::new(args, base.repo_root.clone(), get_version(), base.ui); - - // When we rediscover, stop all current runs - for (_, run) in current_runs.drain() { - run.abort(); - } + let base = CommandBase::new( + args, + self.base.repo_root.clone(), + get_version(), + self.base.ui, + ); // rebuild run struct - *run = RunBuilder::new(base.clone())? + self.run = RunBuilder::new(base.clone())? .hide_prelude() - .build(handler, telemetry.clone()) + .build(&self.handler, self.telemetry.clone()) .await?; - if run.has_persistent_tasks() { + if self.run.has_persistent_tasks() { // Abort old run - if let Some(run) = persistent_tasks_handle.take() { + if let Some(run) = self.persistent_tasks_handle.take() { run.abort(); } - let mut persistent_run = run.create_run_for_persistent_tasks(); + let mut persistent_run = self.run.create_run_for_persistent_tasks(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish - *persistent_tasks_handle = + self.persistent_tasks_handle = Some(tokio::spawn(async move { persistent_run.run().await })); // But we still run the regular tasks blocking - let mut non_persistent_run = run.create_run_without_persistent_tasks(); - non_persistent_run.run().await?; + let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); + Ok(non_persistent_run.run().await?) } else { - run.run().await?; + Ok(self.run.run().await?) } } - proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { - return Err(DaemonError::Unavailable(message).into()); - } } - - Ok(()) } }