From 6cf4bea40594cde8c07b74a2d1cb257c3cfdeefd Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 30 Apr 2024 16:32:29 -0400 Subject: [PATCH 1/5] Split package changes watcher into an event processing loop and a change mapper loop --- Cargo.lock | 1 + Cargo.toml | 1 + crates/turborepo-filewatch/Cargo.toml | 2 +- crates/turborepo-lib/Cargo.toml | 1 + .../src/package_changes_watcher.rs | 310 +++++++++++------- 5 files changed, 197 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7927028539d9..81b7407b90097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11648,6 +11648,7 @@ dependencies = [ "pprof", "pretty_assertions", "prost 0.12.3", + "radix_trie", "rand 0.8.5", "rayon", "regex", diff --git a/Cargo.toml b/Cargo.toml index 1d5e62f472e00..bd6f407cc125c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -259,6 +259,7 @@ pretty_assertions = "1.3.0" proc-macro2 = "1.0.79" qstring = "0.7.2" quote = "1.0.23" +radix_trie = "0.2.1" rand = "0.8.5" ratatui = "0.26.1" regex = "1.7.0" diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index 594fb37f1914c..3b8285a80c9db 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -14,7 +14,7 @@ futures = { version = "0.3.26" } itertools = { workspace = true } nibble_vec = "0.1.0" notify = { workspace = true } -radix_trie = "0.2.1" +radix_trie = { workspace = true } thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } tracing = "0.1.37" diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 3bb9525c3279d..041a1fd31e9a5 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -81,6 +81,7 @@ pprof = { version = "0.12.1", features = [ "frame-pointer", ], optional = true } prost = "0.12.3" +radix_trie = { workspace = true } rand = { workspace = true } rayon = "1.7.0" regex.workspace = true diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index ef1f25cf3fd1e..f569a1ce87381 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -1,8 +1,9 @@ -use std::collections::HashSet; +use std::{cell::RefCell, collections::HashSet, ops::DerefMut, path::PathBuf}; use ignore::gitignore::Gitignore; use notify::Event; -use tokio::sync::{broadcast, oneshot}; +use radix_trie::{Trie, TrieCommon}; +use tokio::sync::{broadcast, oneshot, Mutex}; use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf}; use turborepo_filewatch::{NotifyError, OptionalWatch}; use turborepo_repository::{ @@ -53,8 +54,29 @@ impl PackageChangesWatcher { } } +enum ChangedFiles { + All, + Some(Trie), +} + +impl ChangedFiles { + fn is_empty(&self) -> bool { + match self { + ChangedFiles::All => false, + ChangedFiles::Some(trie) => trie.is_empty(), + } + } +} + +impl Default for ChangedFiles { + fn default() -> Self { + ChangedFiles::Some(Trie::new()) + } +} + struct Subscriber { file_events_lazy: OptionalWatch>>, + changed_files: Mutex>, repo_root: AbsoluteSystemPathBuf, package_change_events_tx: broadcast::Sender, } @@ -108,11 +130,12 @@ impl Subscriber { Subscriber { repo_root, file_events_lazy, + changed_files: Default::default(), package_change_events_tx, } } - async fn initialize_repo_state(&mut self) -> Option { + async fn initialize_repo_state(&self) -> Option<(RepoState, Gitignore)> { let Ok(root_package_json) = PackageJson::load(&self.repo_root.join_component("package.json")) else { @@ -128,6 +151,9 @@ impl Subscriber { ) .ok(); + let gitignore_path = self.repo_root.join_component(".gitignore"); + let (root_gitignore, _) = Gitignore::new(&gitignore_path); + let Ok(pkg_dep_graph) = PackageGraphBuilder::new(&self.repo_root, root_package_json) .build() .await @@ -136,24 +162,59 @@ impl Subscriber { return None; }; - Some(RepoState { - root_turbo_json, - pkg_dep_graph, - }) + Some(( + RepoState { + root_turbo_json, + pkg_dep_graph, + }, + root_gitignore, + )) } async fn watch(mut self, exit_rx: oneshot::Receiver<()>) { - let process = async { + let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) else { + // if we get here, it means that file watching has not started, so we should + // just report that the package watcher is not available + tracing::debug!("file watching shut down, package watcher not available"); + return; + }; + + let event_fut = async { + loop { + match file_events.recv().await { + Ok(Ok(Event { paths, .. })) => { + if let ChangedFiles::Some(trie) = + self.changed_files.lock().await.borrow_mut().deref_mut() + { + for path in paths { + trie.insert(path, ()); + } + } + } + Ok(Err(err)) => { + tracing::error!("file event error: {:?}", err); + break; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("file event lagged"); + // Lagged essentially means we're not keeping up with + // the file events, so + // we can catch up by declaring all files changed + *self.changed_files.lock().await.borrow_mut() = ChangedFiles::All; + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("file event channel closed"); + break; + } + } + } + }; + + let changes_fut = async { let root_pkg = WorkspacePackage::root(); - let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) - else { - // if we get here, it means that file watching has not started, so we should - // just report that the package watcher is not available - tracing::debug!("file watching shut down, package watcher not available"); - return; - }; - let Some(mut repo_state) = self.initialize_repo_state().await else { + let Some((mut repo_state, mut root_gitignore)) = self.initialize_repo_state().await + else { return; }; @@ -167,121 +228,133 @@ impl Subscriber { self.package_change_events_tx .send(PackageChangeEvent::Rediscover) .ok(); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); loop { - match file_events.recv().await { - Ok(Ok(Event { paths, .. })) => { - // No point in raising an error for an invalid .gitignore - // This is slightly incorrect because we should also search for the - // .gitignore files in the workspaces. - let (root_gitignore, _) = - Gitignore::new(&self.repo_root.join_component(".gitignore")); - - let changed_files: HashSet<_> = paths - .into_iter() - .filter_map(|p| { - let p = AbsoluteSystemPathBuf::try_from(p).ok()?; - self.repo_root.anchor(p).ok() - }) - .filter(|p| { - // If in .gitignore or in .git, filter out - !(ancestors_is_ignored(&root_gitignore, p) || is_in_git_folder(p)) - }) - .collect(); - - let changes = change_mapper.changed_packages(changed_files.clone(), None); - - match changes { - Ok(PackageChanges::All) => { - // We tell the client that we need to rediscover the packages, i.e. - // all bets are off, just re-run everything - let _ = self - .package_change_events_tx - .send(PackageChangeEvent::Rediscover); - match self.initialize_repo_state().await { - Some(new_repo_state) => { - repo_state = new_repo_state; - change_mapper = match repo_state.get_change_mapper() { - Some(change_mapper) => change_mapper, - None => { - break; - } - }; - } - None => { - break; - } - } - } - Ok(PackageChanges::Some(mut changed_pkgs)) => { - if !changed_pkgs.is_empty() { - tracing::debug!( - "changed files: {:?} changed packages: {:?}", - changed_files, - changed_pkgs - ); - } + interval.tick().await; - // If the root package has changed, we only send it if we have root - // tasks. Otherwise it's not worth sending as it will only - // pollute up the output logs - if changed_pkgs.contains(&root_pkg) { - let has_root_tasks = repo_state - .root_turbo_json - .as_ref() - .map_or(false, |turbo| turbo.has_root_tasks()); - if !has_root_tasks { - changed_pkgs.remove(&root_pkg); - } - } + let changed_files = self.changed_files.lock().await; + if changed_files.borrow().is_empty() { + continue; + } + + let ChangedFiles::Some(trie) = changed_files.take() else { + let _ = self + .package_change_events_tx + .send(PackageChangeEvent::Rediscover); - for pkg in changed_pkgs { - let _ = self.package_change_events_tx.send( - PackageChangeEvent::Package { - name: pkg.name.clone(), - }, - ); + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, + None => { + break; } - } - Err(err) => { - // Log the error, rediscover the packages and try again - tracing::error!("error: {:?}", err); - - let _ = self - .package_change_events_tx - .send(PackageChangeEvent::Rediscover); - match self.initialize_repo_state().await { - Some(new_repo_state) => { - repo_state = new_repo_state; - change_mapper = match repo_state.get_change_mapper() { - Some(change_mapper) => change_mapper, - None => { - break; - } - } - } + }; + } + None => { + break; + } + } + continue; + }; + + let gitignore_path = self.repo_root.join_component(".gitignore"); + if trie.get(gitignore_path.as_std_path()).is_some() { + let (new_root_gitignore, _) = Gitignore::new(&gitignore_path); + root_gitignore = new_root_gitignore; + } + + let changed_files: HashSet<_> = trie + .keys() + .filter_map(|p| { + let p = AbsoluteSystemPathBuf::try_from(p.as_path()).ok()?; + self.repo_root.anchor(p).ok() + }) + .filter(|p| { + // If in .gitignore or in .git, filter out + !(ancestors_is_ignored(&root_gitignore, p) || is_in_git_folder(p)) + }) + .collect(); + + if changed_files.is_empty() { + continue; + } + + let changed_packages = change_mapper.changed_packages(changed_files.clone(), None); + + tracing::warn!("changed_files: {:?}", changed_files); + tracing::warn!("changed_packages: {:?}", changed_packages); + + match changed_packages { + Ok(PackageChanges::All) => { + // We tell the client that we need to rediscover the packages, i.e. + // all bets are off, just re-run everything + let _ = self + .package_change_events_tx + .send(PackageChangeEvent::Rediscover); + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, None => { break; } - } + }; + } + None => { + break; } } } - Ok(Err(err)) => { - tracing::error!("file event error: {:?}", err); - break; + Ok(PackageChanges::Some(mut filtered_pkgs)) => { + // If the root package has changed, we only send it if we have root + // tasks. Otherwise it's not worth sending as it will only + // pollute up the output logs + if filtered_pkgs.contains(&root_pkg) { + let has_root_tasks = repo_state + .root_turbo_json + .as_ref() + .map_or(false, |turbo| turbo.has_root_tasks()); + if !has_root_tasks { + filtered_pkgs.remove(&root_pkg); + } + } + + for pkg in filtered_pkgs { + let _ = + self.package_change_events_tx + .send(PackageChangeEvent::Package { + name: pkg.name.clone(), + }); + } } - Err(broadcast::error::RecvError::Lagged(_)) => { - tracing::warn!("file event lagged"); - // Lagged essentially means we're not keeping up with the file events, so - // we can catch up by sending a rediscover event + Err(err) => { + // Log the error, rediscover the packages and try again + tracing::error!("error: {:?}", err); + let _ = self .package_change_events_tx .send(PackageChangeEvent::Rediscover); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::debug!("file event channel closed"); - break; + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, + None => { + break; + } + } + } + None => { + break; + } + } } } } @@ -292,7 +365,10 @@ impl Subscriber { _ = exit_rx => { tracing::debug!("exiting package changes watcher due to signal"); }, - _ = process => { + _ = event_fut => { + tracing::debug!("exiting package changes watcher due to file event end"); + }, + _ = changes_fut => { tracing::debug!("exiting package changes watcher due to process end"); } } From f13feb44810d1c7e5f4b18b93ff319079a0380a9 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 30 Apr 2024 16:52:26 -0400 Subject: [PATCH 2/5] Add comments --- crates/turborepo-filewatch/src/lib.rs | 22 +++++++++++++++++++ .../src/package_changes_watcher.rs | 2 ++ 2 files changed, 24 insertions(+) diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index 3f7565341c1e5..b89f877063eec 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -1,3 +1,25 @@ +//! File watching utilities for Turborepo. Includes a file watcher that is +//! designed to work across multiple platforms, with consistent behavior and +//! consistent ordering. +//! +//! Also includes watchers that take in file change events and produce derived +//! data like changed packages or the workspaces in a repository. +//! +//! ## Watcher Implementation +//! It's important to note that when implementing a watcher, you should aim to +//! make file change event processing as fast as possible. There should be +//! almost no slow code in the main event loop. Otherwise, the receiver will lag +//! behind, and return a `Lagged` event. +//! +//! A common pattern that we use to avoid lag is having a separate event thread +//! that processes events and accumulates them into a data structure, say a +//! `Trie` for changed files, or a `HashSet` for changed packages. From there, a +//! second thread is responsible for actually taking that accumulated data and +//! processing it. This second thread can do slower tasks like executing a run +//! or mapping files to changed packages. It can either be parked and awoken +//! using `tokio::sync::Notify` or it can run periodically using +//! `tokio::time::interval`. + #![deny(clippy::all)] #![feature(assert_matches)] diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index f569a1ce87381..027fdaef79089 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -179,6 +179,8 @@ impl Subscriber { return; }; + // This just processes the events and puts the changed files into a `Trie`. + // Must be fast to avoid lagging the file events channel. let event_fut = async { loop { match file_events.recv().await { From fe349011391b048e01a45465fb68cef60189260a Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Fri, 3 May 2024 14:38:31 -0400 Subject: [PATCH 3/5] Fix for windows --- crates/turborepo-lib/src/package_changes_watcher.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index 027fdaef79089..c5874da764bf1 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, collections::HashSet, ops::DerefMut, path::PathBuf}; +use std::{cell::RefCell, collections::HashSet, ops::DerefMut}; use ignore::gitignore::Gitignore; use notify::Event; @@ -56,7 +56,8 @@ impl PackageChangesWatcher { enum ChangedFiles { All, - Some(Trie), + // Trie doesn't support PathBuf as a key on Windows, so we need to use `String` + Some(Trie), } impl ChangedFiles { @@ -189,7 +190,9 @@ impl Subscriber { self.changed_files.lock().await.borrow_mut().deref_mut() { for path in paths { - trie.insert(path, ()); + if let Some(path) = path.to_str() { + trie.insert(path.to_string(), ()); + } } } } @@ -264,7 +267,7 @@ impl Subscriber { }; let gitignore_path = self.repo_root.join_component(".gitignore"); - if trie.get(gitignore_path.as_std_path()).is_some() { + if trie.get(gitignore_path.as_str()).is_some() { let (new_root_gitignore, _) = Gitignore::new(&gitignore_path); root_gitignore = new_root_gitignore; } @@ -272,7 +275,7 @@ impl Subscriber { let changed_files: HashSet<_> = trie .keys() .filter_map(|p| { - let p = AbsoluteSystemPathBuf::try_from(p.as_path()).ok()?; + let p = AbsoluteSystemPathBuf::new(p).ok()?; self.repo_root.anchor(p).ok() }) .filter(|p| { From c19392aa2e8722647abe299f807590ed33a6edca Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 7 May 2024 11:05:40 -0600 Subject: [PATCH 4/5] PR feedback --- crates/turborepo-lib/src/package_changes_watcher.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index c5874da764bf1..17c722f5110b0 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -237,13 +237,16 @@ impl Subscriber { loop { interval.tick().await; + let changed_files = { + let changed_files = self.changed_files.lock().await; + if changed_files.borrow().is_empty() { + continue; + } - let changed_files = self.changed_files.lock().await; - if changed_files.borrow().is_empty() { - continue; - } + changed_files.take() + }; - let ChangedFiles::Some(trie) = changed_files.take() else { + let ChangedFiles::Some(trie) = changed_files else { let _ = self .package_change_events_tx .send(PackageChangeEvent::Rediscover); From cb7c27543dd2d8d76ed2ea168dd66e3c3a9fb5b6 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 7 May 2024 11:34:15 -0600 Subject: [PATCH 5/5] PR feedback --- crates/turborepo-lib/src/package_changes_watcher.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index 17c722f5110b0..e268d9a472d9b 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -278,7 +278,8 @@ impl Subscriber { let changed_files: HashSet<_> = trie .keys() .filter_map(|p| { - let p = AbsoluteSystemPathBuf::new(p).ok()?; + let p = AbsoluteSystemPathBuf::new(p) + .expect("file watching should return absolute paths"); self.repo_root.anchor(p).ok() }) .filter(|p| {