From 09792fa7581a329fa5fe6dc3149a21325c560c6b Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 30 Apr 2024 16:32:29 -0400 Subject: [PATCH] Split package changes watcher into an event processing loop and a change mapper loop --- Cargo.lock | 1 + Cargo.toml | 135 ++++---- crates/turborepo-filewatch/Cargo.toml | 2 +- crates/turborepo-lib/Cargo.toml | 1 + .../src/package_changes_watcher.rs | 297 +++++++++++------- 5 files changed, 259 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7927028539d99..81b7407b900973 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11648,6 +11648,7 @@ dependencies = [ "pprof", "pretty_assertions", "prost 0.12.3", + "radix_trie", "rand 0.8.5", "rayon", "regex", diff --git a/Cargo.toml b/Cargo.toml index 1d5e62f472e00b..cd594e89098a27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,56 +2,56 @@ resolver = "2" members = [ - "crates/node-file-trace", - "crates/tower-uds", - "crates/turbo-tasks*", - "crates/turbopack*", - "crates/turborepo*", - "packages/turbo-repository/rust", - "xtask", + "crates/node-file-trace", + "crates/tower-uds", + "crates/turbo-tasks*", + "crates/turbopack*", + "crates/turborepo*", + "packages/turbo-repository/rust", + "xtask", ] default-members = [ - "crates/node-file-trace", - "crates/turbo-tasks", - "crates/turbo-tasks-auto-hash-map", - "crates/turbo-tasks-build", - "crates/turbo-tasks-bytes", - "crates/turbo-tasks-env", - "crates/turbo-tasks-fetch", - "crates/turbo-tasks-fs", - "crates/turbo-tasks-hash", - "crates/turbo-tasks-macros-shared", - "crates/turbo-tasks-macros-tests", - "crates/turbo-tasks-malloc", - "crates/turbo-tasks-memory", - "crates/turbo-tasks-signposter", - "crates/turbo-tasks-signposter-sys", - "crates/turbo-tasks-testing", - "crates/turbo-tasks-tracing-signpost", - "crates/turbopack", - "crates/turbopack-bench", - "crates/turbopack-cli-utils", - "crates/turbopack-core", - "crates/turbopack-create-test-app", - "crates/turbopack-css", - "crates/turbopack-browser", - "crates/turbopack-dev-server", - "crates/turbopack-ecmascript", - "crates/turbopack-env", - "crates/turbopack-json", - "crates/turbopack-mdx", - "crates/turbopack-node", - "crates/turbopack-resolve", - "crates/turbopack-static", - "crates/turbopack-swc-ast-explorer", - "crates/turbopack-swc-utils", - "crates/turbopack-test-utils", - "crates/turbopack-tests", - "crates/turbopack-trace-server", - "crates/turbopack-trace-utils", - "crates/turbopack-wasm", - "xtask", + "crates/node-file-trace", + "crates/turbo-tasks", + "crates/turbo-tasks-auto-hash-map", + "crates/turbo-tasks-build", + "crates/turbo-tasks-bytes", + "crates/turbo-tasks-env", + "crates/turbo-tasks-fetch", + "crates/turbo-tasks-fs", + "crates/turbo-tasks-hash", + "crates/turbo-tasks-macros-shared", + "crates/turbo-tasks-macros-tests", + "crates/turbo-tasks-malloc", + "crates/turbo-tasks-memory", + "crates/turbo-tasks-signposter", + "crates/turbo-tasks-signposter-sys", + "crates/turbo-tasks-testing", + "crates/turbo-tasks-tracing-signpost", + "crates/turbopack", + "crates/turbopack-bench", + "crates/turbopack-cli-utils", + "crates/turbopack-core", + "crates/turbopack-create-test-app", + "crates/turbopack-css", + "crates/turbopack-browser", + "crates/turbopack-dev-server", + "crates/turbopack-ecmascript", + "crates/turbopack-env", + "crates/turbopack-json", + "crates/turbopack-mdx", + "crates/turbopack-node", + "crates/turbopack-resolve", + "crates/turbopack-static", + "crates/turbopack-swc-ast-explorer", + "crates/turbopack-swc-utils", + "crates/turbopack-test-utils", + "crates/turbopack-tests", + "crates/turbopack-trace-server", + "crates/turbopack-trace-utils", + "crates/turbopack-wasm", + "xtask", ] [workspace.metadata.groups] @@ -60,18 +60,18 @@ default-members = [ turborepo-libraries = ["path:crates/turborepo-*"] turborepo = ["path:crates/turborepo*"] turbopack = [ - "path:crates/turbopack*", - "path:crates/turbo-tasks*", - "path:crates/node-file-trace", + "path:crates/turbopack*", + "path:crates/turbo-tasks*", + "path:crates/node-file-trace", ] # List of the packages can be compiled against wasm32-wasi target. turbopack-wasi = [ - "path:crates/turbo-tasks-auto-hash-map", - "path:crates/turbo-tasks-hash", - "path:crates/turbo-tasks-macro", - "path:crates/turbo-tasks-macros-shared", - "path:crates/turbo-tasks-build", + "path:crates/turbo-tasks-auto-hash-map", + "path:crates/turbo-tasks-hash", + "path:crates/turbo-tasks-macro", + "path:crates/turbo-tasks-macros-shared", + "path:crates/turbo-tasks-build", ] [workspace.lints.clippy] @@ -108,8 +108,8 @@ modularize_imports = { version = "0.68.9" } styled_components = { version = "0.96.8" } styled_jsx = { version = "0.73.13" } swc_core = { version = "0.90.33", features = [ - "ecma_loader_lru", - "ecma_loader_parking_lot", + "ecma_loader_lru", + "ecma_loader_parking_lot", ] } swc_emotion = { version = "0.72.8" } swc_relay = { version = "0.44.8" } @@ -189,7 +189,7 @@ turborepo-vt100 = { path = "crates/turborepo-vt100" } reqwest = { version = "=0.11.17", default-features = false } chromiumoxide = { version = "0.5.0", features = [ - "tokio-runtime", + "tokio-runtime", ], default-features = false } # For matching on errors from chromiumoxide. Keep in # sync with chromiumoxide's tungstenite requirement. @@ -198,8 +198,8 @@ tungstenite = "0.18.0" anyhow = "1.0.69" assert_cmd = "2.0.8" async-compression = { version = "0.3.13", default-features = false, features = [ - "gzip", - "tokio", + "gzip", + "tokio", ] } async-trait = "0.1.64" atty = "0.2.14" @@ -235,9 +235,9 @@ indoc = "2.0.0" itertools = "0.10.5" lazy_static = "1.4.0" lightningcss = { version = "1.0.0-alpha.50", features = [ - "serde", - "visitor", - "into_owned", + "serde", + "visitor", + "into_owned", ] } mime = "0.3.16" nohash-hasher = "0.2.0" @@ -259,6 +259,7 @@ pretty_assertions = "1.3.0" proc-macro2 = "1.0.79" qstring = "0.7.2" quote = "1.0.23" +radix_trie = "0.2.1" rand = "0.8.5" ratatui = "0.26.1" regex = "1.7.0" @@ -272,10 +273,10 @@ serde_with = "2.3.2" serde_yaml = "0.9.17" sha2 = "0.10.6" smallvec = { version = "1.13.1", features = [ - "serde", - "const_generics", - "union", - "const_new", + "serde", + "const_generics", + "union", + "const_new", ] } sourcemap = "8.0.1" syn = "1.0.107" diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index 594fb37f1914c8..3b8285a80c9dbe 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -14,7 +14,7 @@ futures = { version = "0.3.26" } itertools = { workspace = true } nibble_vec = "0.1.0" notify = { workspace = true } -radix_trie = "0.2.1" +radix_trie = { workspace = true } thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } tracing = "0.1.37" diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 3bb9525c3279d3..041a1fd31e9a59 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -81,6 +81,7 @@ pprof = { version = "0.12.1", features = [ "frame-pointer", ], optional = true } prost = "0.12.3" +radix_trie = { workspace = true } rand = { workspace = true } rayon = "1.7.0" regex.workspace = true diff --git a/crates/turborepo-lib/src/package_changes_watcher.rs b/crates/turborepo-lib/src/package_changes_watcher.rs index ef1f25cf3fd1ee..96051477668c0e 100644 --- a/crates/turborepo-lib/src/package_changes_watcher.rs +++ b/crates/turborepo-lib/src/package_changes_watcher.rs @@ -1,8 +1,9 @@ -use std::collections::HashSet; +use std::{cell::RefCell, collections::HashSet, ops::DerefMut, path::PathBuf}; use ignore::gitignore::Gitignore; use notify::Event; -use tokio::sync::{broadcast, oneshot}; +use radix_trie::{Trie, TrieCommon}; +use tokio::sync::{broadcast, oneshot, Mutex}; use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf}; use turborepo_filewatch::{NotifyError, OptionalWatch}; use turborepo_repository::{ @@ -53,8 +54,29 @@ impl PackageChangesWatcher { } } +enum ChangedFiles { + All, + Some(Trie), +} + +impl ChangedFiles { + fn is_empty(&self) -> bool { + match self { + ChangedFiles::All => false, + ChangedFiles::Some(trie) => trie.is_empty(), + } + } +} + +impl Default for ChangedFiles { + fn default() -> Self { + ChangedFiles::Some(Trie::new()) + } +} + struct Subscriber { file_events_lazy: OptionalWatch>>, + changed_files: Mutex>, repo_root: AbsoluteSystemPathBuf, package_change_events_tx: broadcast::Sender, } @@ -108,11 +130,12 @@ impl Subscriber { Subscriber { repo_root, file_events_lazy, + changed_files: Default::default(), package_change_events_tx, } } - async fn initialize_repo_state(&mut self) -> Option { + async fn initialize_repo_state(&self) -> Option<(RepoState, Gitignore)> { let Ok(root_package_json) = PackageJson::load(&self.repo_root.join_component("package.json")) else { @@ -128,6 +151,9 @@ impl Subscriber { ) .ok(); + let gitignore_path = self.repo_root.join_component(".gitignore"); + let (root_gitignore, _) = Gitignore::new(&gitignore_path); + let Ok(pkg_dep_graph) = PackageGraphBuilder::new(&self.repo_root, root_package_json) .build() .await @@ -136,24 +162,59 @@ impl Subscriber { return None; }; - Some(RepoState { - root_turbo_json, - pkg_dep_graph, - }) + Some(( + RepoState { + root_turbo_json, + pkg_dep_graph, + }, + root_gitignore, + )) } async fn watch(mut self, exit_rx: oneshot::Receiver<()>) { - let process = async { + let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) else { + // if we get here, it means that file watching has not started, so we should + // just report that the package watcher is not available + tracing::debug!("file watching shut down, package watcher not available"); + return; + }; + + let event_fut = async { + loop { + match file_events.recv().await { + Ok(Ok(Event { paths, .. })) => { + if let ChangedFiles::Some(trie) = + self.changed_files.lock().await.borrow_mut().deref_mut() + { + for path in paths { + trie.insert(path, ()); + } + } + } + Ok(Err(err)) => { + tracing::error!("file event error: {:?}", err); + break; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("file event lagged"); + // Lagged essentially means we're not keeping up with + // the file events, so + // we can catch up by declaring all files changed + *self.changed_files.lock().await.borrow_mut() = ChangedFiles::All; + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("file event channel closed"); + break; + } + } + } + }; + + let changes_fut = async { let root_pkg = WorkspacePackage::root(); - let Ok(mut file_events) = self.file_events_lazy.get().await.map(|r| r.resubscribe()) - else { - // if we get here, it means that file watching has not started, so we should - // just report that the package watcher is not available - tracing::debug!("file watching shut down, package watcher not available"); - return; - }; - let Some(mut repo_state) = self.initialize_repo_state().await else { + let Some((mut repo_state, mut root_gitignore)) = self.initialize_repo_state().await + else { return; }; @@ -167,20 +228,50 @@ impl Subscriber { self.package_change_events_tx .send(PackageChangeEvent::Rediscover) .ok(); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); loop { - match file_events.recv().await { - Ok(Ok(Event { paths, .. })) => { - // No point in raising an error for an invalid .gitignore - // This is slightly incorrect because we should also search for the - // .gitignore files in the workspaces. - let (root_gitignore, _) = - Gitignore::new(&self.repo_root.join_component(".gitignore")); - - let changed_files: HashSet<_> = paths - .into_iter() + interval.tick().await; + + let changed_files = self.changed_files.lock().await; + if changed_files.borrow().is_empty() { + continue; + } + + let changes = match changed_files.take() { + ChangedFiles::All => { + let _ = self + .package_change_events_tx + .send(PackageChangeEvent::Rediscover); + + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, + None => { + break; + } + }; + } + None => { + break; + } + } + change_mapper.changed_packages(HashSet::new(), None) + } + ChangedFiles::Some(trie) => { + let gitignore_path = self.repo_root.join_component(".gitignore"); + if trie.get(gitignore_path.as_std_path()).is_some() { + let (new_root_gitignore, _) = Gitignore::new(&gitignore_path); + root_gitignore = new_root_gitignore; + } + + let changed_files: HashSet<_> = trie + .keys() .filter_map(|p| { - let p = AbsoluteSystemPathBuf::try_from(p).ok()?; + let p = AbsoluteSystemPathBuf::try_from(p.as_path()).ok()?; self.repo_root.anchor(p).ok() }) .filter(|p| { @@ -189,99 +280,84 @@ impl Subscriber { }) .collect(); - let changes = change_mapper.changed_packages(changed_files.clone(), None); - - match changes { - Ok(PackageChanges::All) => { - // We tell the client that we need to rediscover the packages, i.e. - // all bets are off, just re-run everything - let _ = self - .package_change_events_tx - .send(PackageChangeEvent::Rediscover); - match self.initialize_repo_state().await { - Some(new_repo_state) => { - repo_state = new_repo_state; - change_mapper = match repo_state.get_change_mapper() { - Some(change_mapper) => change_mapper, - None => { - break; - } - }; - } - None => { - break; - } - } - } - Ok(PackageChanges::Some(mut changed_pkgs)) => { - if !changed_pkgs.is_empty() { - tracing::debug!( - "changed files: {:?} changed packages: {:?}", - changed_files, - changed_pkgs - ); - } + if changed_files.is_empty() { + continue; + } + tracing::warn!("changed_files: {:?}", changed_files); - // If the root package has changed, we only send it if we have root - // tasks. Otherwise it's not worth sending as it will only - // pollute up the output logs - if changed_pkgs.contains(&root_pkg) { - let has_root_tasks = repo_state - .root_turbo_json - .as_ref() - .map_or(false, |turbo| turbo.has_root_tasks()); - if !has_root_tasks { - changed_pkgs.remove(&root_pkg); - } - } + change_mapper.changed_packages(changed_files.clone(), None) + } + }; - for pkg in changed_pkgs { - let _ = self.package_change_events_tx.send( - PackageChangeEvent::Package { - name: pkg.name.clone(), - }, - ); - } - } - Err(err) => { - // Log the error, rediscover the packages and try again - tracing::error!("error: {:?}", err); - - let _ = self - .package_change_events_tx - .send(PackageChangeEvent::Rediscover); - match self.initialize_repo_state().await { - Some(new_repo_state) => { - repo_state = new_repo_state; - change_mapper = match repo_state.get_change_mapper() { - Some(change_mapper) => change_mapper, - None => { - break; - } - } - } + tracing::warn!("changes: {:?}", changes); + + match changes { + Ok(PackageChanges::All) => { + // We tell the client that we need to rediscover the packages, i.e. + // all bets are off, just re-run everything + let _ = self + .package_change_events_tx + .send(PackageChangeEvent::Rediscover); + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, None => { break; } - } + }; + } + None => { + break; } } } - Ok(Err(err)) => { - tracing::error!("file event error: {:?}", err); - break; + Ok(PackageChanges::Some(mut filtered_pkgs)) => { + // If the root package has changed, we only send it if we have root + // tasks. Otherwise it's not worth sending as it will only + // pollute up the output logs + if filtered_pkgs.contains(&root_pkg) { + let has_root_tasks = repo_state + .root_turbo_json + .as_ref() + .map_or(false, |turbo| turbo.has_root_tasks()); + if !has_root_tasks { + filtered_pkgs.remove(&root_pkg); + } + } + + for pkg in filtered_pkgs { + let _ = + self.package_change_events_tx + .send(PackageChangeEvent::Package { + name: pkg.name.clone(), + }); + } } - Err(broadcast::error::RecvError::Lagged(_)) => { - tracing::warn!("file event lagged"); - // Lagged essentially means we're not keeping up with the file events, so - // we can catch up by sending a rediscover event + Err(err) => { + // Log the error, rediscover the packages and try again + tracing::error!("error: {:?}", err); + let _ = self .package_change_events_tx .send(PackageChangeEvent::Rediscover); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::debug!("file event channel closed"); - break; + match self.initialize_repo_state().await { + Some((new_repo_state, new_gitignore)) => { + repo_state = new_repo_state; + root_gitignore = new_gitignore; + change_mapper = match repo_state.get_change_mapper() { + Some(change_mapper) => change_mapper, + None => { + break; + } + } + } + None => { + break; + } + } } } } @@ -292,7 +368,10 @@ impl Subscriber { _ = exit_rx => { tracing::debug!("exiting package changes watcher due to signal"); }, - _ = process => { + _ = event_fut => { + tracing::debug!("exiting package changes watcher due to file event end"); + }, + _ = changes_fut => { tracing::debug!("exiting package changes watcher due to process end"); } }