diff --git a/Cargo.lock b/Cargo.lock index b24ab31e15afc..a2a1ab0cca90e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2682,6 +2682,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-iterator" version = "0.7.0" @@ -5042,6 +5048,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec 1.13.1", +] + [[package]] name = "nix" version = "0.25.1" @@ -6314,6 +6329,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce082a9940a7ace2ad4a8b7d0b1eac6aa378895f18be598230c5f2284ac05426" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.4.6" @@ -11555,7 +11580,9 @@ dependencies = [ "futures 0.3.30", "git2 0.16.1", "libc", + "nibble_vec", "notify", + "radix_trie", "tempfile", "thiserror", "tokio", diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index fb163647c2901..594fb37f1914c 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -11,7 +11,10 @@ workspace = true [dependencies] futures = { version = "0.3.26" } +itertools = { workspace = true } +nibble_vec = "0.1.0" notify = { workspace = true } +radix_trie = "0.2.1" thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } tracing = "0.1.37" diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 6466ae493174e..7da6f6d29981c 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -112,9 +112,27 @@ impl GlobSet { Ok(Self { include, exclude, - exclude_raw: BTreeSet::from_iter(raw_excludes.into_iter()), + exclude_raw: BTreeSet::from_iter(raw_excludes), }) } + + // delegates to from_raw, but filters the globs into inclusions and exclusions + // first + pub fn from_raw_unfiltered(raw: Vec) -> Result { + let (includes, excludes): (Vec<_>, Vec<_>) = { + let mut includes = vec![]; + let mut excludes = vec![]; + for pattern in raw { + if let Some(exclude) = pattern.strip_prefix('!') { + excludes.push(exclude.to_string()); + } else { + includes.push(pattern); + } + } + (includes, excludes) + }; + Self::from_raw(includes, excludes) + } } #[derive(Debug, Error)] diff --git a/crates/turborepo-filewatch/src/hash_watcher.rs b/crates/turborepo-filewatch/src/hash_watcher.rs index 6f1cfaaefb5f1..d70e9bd4e4c9a 100644 --- a/crates/turborepo-filewatch/src/hash_watcher.rs +++ b/crates/turborepo-filewatch/src/hash_watcher.rs @@ -1,36 +1,26 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - sync::Arc, + collections::{HashMap, HashSet}, + sync::atomic::{AtomicUsize, Ordering}, }; use notify::Event; +use radix_trie::{Trie, TrieCommon}; use thiserror::Error; use tokio::{ select, - sync::{ - broadcast, mpsc, oneshot, - watch::{self, error::RecvError}, - }, -}; -use tracing::debug; -use turbopath::{ - AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf, PathRelation, - RelativeUnixPathBuf, + sync::{broadcast, mpsc, oneshot, watch}, }; +use tracing::{debug, trace}; +use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf, PathRelation}; use turborepo_repository::discovery::DiscoveryResponse; use turborepo_scm::{package_deps::GitHashes, Error as SCMError, SCM}; -use crate::{ - cookies::{CookieWatcher, CookiedRequest}, - globwatcher::GlobSet, - package_watcher::DiscoveryData, - NotifyError, OptionalWatch, -}; +use crate::{globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, OptionalWatch}; -struct HashWatcher { +pub struct HashWatcher { _exit_tx: oneshot::Sender<()>, _handle: tokio::task::JoinHandle<()>, - query_tx_lazy: OptionalWatch>, + query_tx: mpsc::Sender, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -45,6 +35,8 @@ pub enum Error { HashingError(String), #[error("file hashing is not available: {0}")] Unavailable(String), + #[error("package not found: {} {:?}", .0.package_path, .0.inputs)] + UnknownPackage(HashSpec), } // Communication errors that all funnel to Unavailable @@ -68,69 +60,154 @@ impl From> for Error { } impl HashWatcher { - fn new( + pub fn new( repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, file_events: OptionalWatch>>, - scm: &SCM, + scm: SCM, ) -> Self { let (exit_tx, exit_rx) = oneshot::channel(); - //let (query_tx, query_rx) = mpsc::channel(16); - let (query_tx_state, query_tx_lazy) = OptionalWatch::new(); - let process = - HashWatchProcess::new(repo_root, package_discovery, scm.clone(), query_tx_state); - let handle = tokio::spawn(process.watch(exit_rx, file_events)); + let (query_tx, query_rx) = mpsc::channel(16); + let subscriber = Subscriber::new(repo_root, package_discovery, scm, query_rx); + let handle = tokio::spawn(subscriber.watch(exit_rx, file_events)); Self { _exit_tx: exit_tx, _handle: handle, - query_tx_lazy, + query_tx, } } - async fn get_hash_blocking(&self, hash_spec: HashSpec) -> Result { + // Note that this does not wait for any sort of ready signal. The watching + // process won't respond until filewatching is ready, but there is no + // guarantee that package data or file hashing will be done before + // responding. Both package discovery and file hashing can fail depending on the + // state of the filesystem, so clients will need to be robust to receiving + // errors. + pub async fn get_file_hashes(&self, hash_spec: HashSpec) -> Result { let (tx, rx) = oneshot::channel(); - let query_tx = self.query_tx_lazy.clone().get().await?.clone(); - query_tx.send(Query::GetHash(hash_spec, tx)).await?; - let resp = rx.await?; - resp.map_err(|e| Error::HashingError(e)) + self.query_tx.send(Query::GetHash(hash_spec, tx)).await?; + rx.await? } } -struct HashWatchProcess { +struct Subscriber { repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, - query_tx_state: watch::Sender>>, + query_rx: mpsc::Receiver, scm: SCM, + next_version: AtomicUsize, } enum Query { - GetHash(HashSpec, oneshot::Sender>), - //CookiedGetHash(CookiedRequest<(HashSpec, oneshot::Sender>)>) + GetHash(HashSpec, oneshot::Sender>), } -// Version is a type that exists to stamp an asynchronous hash computation with -// a version so that we can ignore completion of outdated hash computations. -#[derive(Clone)] -struct Version(Arc<()>); +// Version is a type that exists to stamp an asynchronous hash computation +// with a version so that we can ignore completion of outdated hash +// computations. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +struct Version(usize); -impl PartialEq for Version { - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } +enum HashState { + Hashes(GitHashes), + Pending(Version, Vec>>), + Unavailable(String), } +// We use a radix_trie to store hashes so that we can quickly match a file path +// to a package without having to iterate over the set of all packages. We +// expect file changes to be the highest volume of events that this service +// handles, so we want to ensure we're efficient in deciding if a given change +// is relevant or not. +// +// Our Trie keys off of a String because of the orphan rule. Keys are required +// to be TrieKey, but this crate doesn't own TrieKey or AnchoredSystemPathBuf. +// We *could* implement TrieKey in AnchoredSystemPathBuf and avoid the String +// conversion, if we decide we want to add the radix_trie dependency to +// turbopath. +struct FileHashes(Trie, HashState>>); + +impl FileHashes { + fn new() -> Self { + Self(Trie::new()) + } -impl Eq for Version {} + fn drop_matching(&mut self, mut f: F, reason: &str) + where + F: FnMut(&AnchoredSystemPath) -> bool, + { + let mut previous = std::mem::take(&mut self.0); + + // radix_trie doesn't have an into_iter() implementation, so we have a slightly + // inefficient method for removing matching values. Fortunately, we only + // need to do this when the package layout changes. It's O(n) in the + // number of packages, on top of the trie internals. + let keys = previous.keys().map(|k| k.to_owned()).collect::>(); + for key in keys { + let previous_value = previous + .remove(&key) + .expect("this key was pulled from previous"); + let path_key = + AnchoredSystemPath::new(&key).expect("keys are valid AnchoredSystemPaths"); + if !f(path_key) { + // keep it, we didn't match the key. + self.0.insert(key, previous_value); + } else { + for state in previous_value.into_values() { + if let HashState::Pending(_, txs) = state { + for tx in txs { + let _ = tx.send(Err(Error::Unavailable(reason.to_string()))); + } + } + } + } + } + } -impl Version { - fn new() -> Self { - Self(Arc::new(())) + fn get_package_path(&self, file_path: &AnchoredSystemPath) -> Option<&AnchoredSystemPath> { + self.0 + .get_ancestor(file_path.as_str()) + .and_then(|subtrie| subtrie.key()) + .map(|package_path| { + AnchoredSystemPath::new(package_path).expect("keys are valid AnchoredSystemPaths") + }) + .filter(|package_path| { + // handle scenarios where even though we've found an ancestor, it might be a + // sibling file or directory that starts with the same prefix, + // e,g an update to apps/foo_decoy when the package path is + // apps/foo. Note that relation_to_path will return Parent for + // equivalent paths. + package_path.relation_to_path(file_path) == PathRelation::Parent + }) } -} -enum HashState { - Hashes(GitHashes), - Pending(Version, Vec>>), - Unavailable(String), + fn drain(&mut self, reason: &str) { + // funnel through drop_matching even though we could just swap with a new trie. + // We want to ensure we respond to any pending queries. + self.drop_matching(|_| true, reason); + } + + fn contains_key(&self, key: &HashSpec) -> bool { + self.0 + .get(key.package_path.as_str()) + .and_then(|states| states.get(&key.inputs)) + .is_some() + } + + fn insert(&mut self, key: HashSpec, value: HashState) { + if let Some(states) = self.0.get_mut(key.package_path.as_str()) { + states.insert(key.inputs, value); + } else { + let mut states = HashMap::new(); + states.insert(key.inputs, value); + self.0.insert(key.package_path.as_str().to_owned(), states); + } + } + + fn get_mut(&mut self, key: &HashSpec) -> Option<&mut HashState> { + self.0 + .get_mut(key.package_path.as_str()) + .and_then(|states| states.get_mut(&key.inputs)) + } } struct HashUpdate { @@ -139,18 +216,19 @@ struct HashUpdate { result: Result, } -impl HashWatchProcess { +impl Subscriber { fn new( repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, scm: SCM, - query_tx_state: watch::Sender>>, + query_rx: mpsc::Receiver, ) -> Self { Self { repo_root, package_discovery, scm, - query_tx_state, + query_rx, + next_version: AtomicUsize::new(0), } } @@ -167,18 +245,24 @@ impl HashWatchProcess { return; } }; - let (query_tx, mut query_rx) = mpsc::channel(16); let (hash_update_tx, mut hash_update_rx) = mpsc::channel::(16); - let mut hashes: HashMap = HashMap::new(); - - // We need to wait for the first non-null (error is ok) update from the package - // watcher before signalling ourselves as ready. - let package_data = self.package_discovery.borrow().to_owned(); - let mut ready = package_data.is_some(); - self.handle_package_data_update(package_data, &mut hashes, &hash_update_tx); - if ready { - let _ = self.query_tx_state.send(Some(query_tx.clone())); - } + let mut hashes = FileHashes::new(); + + let mut package_data = self.package_discovery.borrow().to_owned(); + self.handle_package_data_update(&package_data, &mut hashes, &hash_update_tx); + // We've gotten the ready signal from filewatching, and *some* state from + // package discovery, but there is no guarantee that package discovery + // is ready. This means that initial queries may be returned with errors + // until we've completed package discovery and then hashing. + // + // This is the main event loop for the hash watcher. It receives file events, + // updates to the package discovery state, and queries for hashes. It does + // not use filesystem cookies, as it is expected that the client will + // synchronize itself first before issuing a series of queries, one per + // task that in the task graph for a run, and we don't want to block on + // the filesystem for each query. This is analogous to running without + // the daemon, where we assume a static filesystem for the duration of + // generating task hashes. loop { select! { biased; @@ -187,28 +271,26 @@ impl HashWatchProcess { return; }, _ = self.package_discovery.changed() => { - let package_data = self.package_discovery.borrow().to_owned(); - // If we weren't already ready, and this update is non-null, we are now ready - ready = !ready && package_data.is_some(); - self.handle_package_data_update(package_data, &mut hashes, &hash_update_tx); - if ready { - let _ = self.query_tx_state.send(Some(query_tx.clone())); - } + self.package_discovery.borrow().clone_into(&mut package_data); + self.handle_package_data_update(&package_data, &mut hashes, &hash_update_tx); }, file_event = file_events_recv.recv() => { match file_event { Ok(Ok(event)) => { - self.handle_file_event(event, &mut hashes); + self.handle_file_event(event, &mut hashes, &hash_update_tx); }, Ok(Err(e)) => { debug!("file watcher error: {:?}", e); + self.flush_and_rehash(&mut hashes, &hash_update_tx, &package_data, &format!("file watcher error: {e}")); }, Err(broadcast::error::RecvError::Closed) => { debug!("file watcher closed"); + hashes.drain("file watcher closed"); return; }, Err(broadcast::error::RecvError::Lagged(_)) => { debug!("file watcher lagged"); + self.flush_and_rehash(&mut hashes, &hash_update_tx, &package_data, "file watcher lagged"); }, } }, @@ -220,16 +302,30 @@ impl HashWatchProcess { unreachable!("hash update channel closed, but we have a live reference to it"); } }, - query = query_rx.recv() => { - if let Some(query) = query { - self.handle_query(query, &mut hashes); - } + Some(query) = self.query_rx.recv() => { + self.handle_query(query, &mut hashes); } } } } - fn handle_query(&self, query: Query, hashes: &mut HashMap) { + fn flush_and_rehash( + &self, + hashes: &mut FileHashes, + hash_update_tx: &mpsc::Sender, + package_data: &Option>, + reason: &str, + ) { + // We need to send errors to any RPCs that are pending, and having an empty set + // of hashes will cause handle_package_data_update to consider all + // packages as new and rehash them. + hashes.drain(reason); + self.handle_package_data_update(package_data, hashes, hash_update_tx); + } + + // We currently only support a single query, getting hashes for a given + // HashSpec. + fn handle_query(&self, query: Query, hashes: &mut FileHashes) { match query { Query::GetHash(spec, tx) => { if let Some(state) = hashes.get_mut(&spec) { @@ -241,17 +337,17 @@ impl HashWatchProcess { txs.push(tx); } HashState::Unavailable(e) => { - let _ = tx.send(Err(e.clone())); + let _ = tx.send(Err(Error::HashingError(e.clone()))); } } } else { - let _ = tx.send(Err(format!("package not found: {}", spec.package_path))); + let _ = tx.send(Err(Error::UnknownPackage(spec))); } } } } - fn handle_hash_update(&self, update: HashUpdate, hashes: &mut HashMap) { + fn handle_hash_update(&self, update: HashUpdate, hashes: &mut FileHashes) { let HashUpdate { spec, version, @@ -260,8 +356,9 @@ impl HashWatchProcess { // If we have a pending hash computation, update the state. If we don't, ignore // this update if let Some(state) = hashes.get_mut(&spec) { - // If we have a pending hash computation, update the state. If we don't, ignore - // this update + // We need mutable access to 'state' to update it, as well as being able to + // extract the pending state, so we need two separate if statements + // to pull the value apart. if let HashState::Pending(existing_version, pending_queries) = state { if *existing_version == version { match result { @@ -277,7 +374,7 @@ impl HashWatchProcess { let error = e.to_string(); for pending_query in pending_queries.drain(..) { // We don't care if the client has gone away - let _ = pending_query.send(Err(error.clone())); + let _ = pending_query.send(Err(Error::HashingError(error.clone()))); } *state = HashState::Unavailable(error); } @@ -292,86 +389,73 @@ impl HashWatchProcess { spec: &HashSpec, hash_update_tx: &mpsc::Sender, ) -> Version { - let version = Version::new(); - let version_copy = version.clone(); + let version = Version(self.next_version.fetch_add(1, Ordering::SeqCst)); let tx = hash_update_tx.clone(); let spec = spec.clone(); let repo_root = self.repo_root.clone(); let scm = self.scm.clone(); + // Package hashing involves blocking IO calls, so run on a blocking thread. tokio::task::spawn_blocking(move || { let telemetry = None; let inputs = spec.inputs.as_ref().map(|globs| globs.as_inputs()); let result = scm.get_package_file_hashes( &repo_root, &spec.package_path, - inputs - .as_ref() - .map(|inputs| inputs.as_slice()) - .unwrap_or_default(), + inputs.as_deref().unwrap_or_default(), telemetry, ); - //let result = self.hash_package(&spec_copy); let _ = tx.blocking_send(HashUpdate { spec, - version: version_copy, + version, result, }); }); version } - fn handle_file_event(&self, event: Event, hashes: &mut HashMap) { - let mut changed_packages: HashSet = HashSet::new(); - 'change_path: for path in event.paths { + fn handle_file_event( + &self, + event: Event, + hashes: &mut FileHashes, + hash_update_tx: &mpsc::Sender, + ) { + let mut changed_packages: HashSet = HashSet::new(); + for path in event.paths { let path = AbsoluteSystemPathBuf::try_from(path).expect("event path is a valid path"); let repo_relative_change_path = self .repo_root .anchor(&path) .expect("event path is in the repository"); - // TODO: better data structure to make this more efficient - for hash_spec in changed_packages.iter() { - if hash_spec - .package_path - .relation_to_path(&repo_relative_change_path) - == PathRelation::Parent - { - // We've already seen a change in a parent package, no need to check this one - continue 'change_path; - } - } - for hash_spec in hashes.keys() { - if hash_spec - .package_path - .relation_to_path(&repo_relative_change_path) - == PathRelation::Parent - { - changed_packages.insert(hash_spec.clone()); - } + // If this change is not relevant to a package, ignore it + trace!("file change at {:?}", repo_relative_change_path); + if let Some(package_path) = hashes.get_package_path(&repo_relative_change_path) { + // We have a file change in a package, and we haven't seen this package yet. + // Queue it for rehashing. + // TODO: further qualification. Which sets of inputs? Is this file .gitignored? + // We are somewhat saved here by deferring to the SCM to do the hashing. A + // change to a gitignored file will trigger a re-hash, but won't + // actually affect what the hash is. + trace!("package changed: {:?}", package_path); + changed_packages.insert(package_path.to_owned()); + } else { + trace!("Ignoring change to {repo_relative_change_path}"); } } - // let package_path = self.repo_root.anchor(&path).expect("event path is - // in the repository"); let spec = HashSpec { - // package_path, - // inputs: None, - // }; - // if let Some(state) = hashes.get_mut(&spec) { - // match state { - // HashState::Pending(count) => { - // *count += 1; - // }, - // HashState::Hash(_) => { - // *state = HashState::Pending(1); - // }, - // } - // } else { - // hashes.insert(spec, HashState::Pending(1)); - // } + // TODO: handle different sets of inputs + for package_path in changed_packages { + let spec = HashSpec { + package_path, + inputs: None, + }; + let version = self.queue_package_hash(&spec, hash_update_tx); + hashes.insert(spec, HashState::Pending(version, vec![])); + } } fn handle_package_data_update( &self, - package_data: Option>, - hashes: &mut HashMap, + package_data: &Option>, + hashes: &mut FileHashes, hash_update_tx: &mpsc::Sender, ) { debug!("handling package data {:?}", package_data); @@ -389,7 +473,10 @@ impl HashWatchProcess { })); // We have new package data. Drop any packages we don't need anymore, add any // new ones - hashes.retain(|spec, _| package_paths.contains(&spec.package_path)); + hashes.drop_matching( + |package_path| !package_paths.contains(package_path), + "package was removed", + ); for package_path in package_paths { let spec = HashSpec { package_path, @@ -404,13 +491,7 @@ impl HashWatchProcess { } None | Some(Err(_)) => { // package data invalidated, flush everything - for (_, state) in hashes.drain() { - if let HashState::Pending(_, txs) = state { - for tx in txs { - let _ = tx.send(Err("package discovery is unavailable".to_string())); - } - } - } + hashes.drain("package discovery is unavailable"); } } } @@ -418,13 +499,20 @@ impl HashWatchProcess { #[cfg(test)] mod tests { - use std::time::{Duration, Instant}; + use std::{ + assert_matches::assert_matches, + ops::Deref, + time::{Duration, Instant}, + }; use git2::Repository; use tempfile::{tempdir, TempDir}; - use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPathBuf}; + use turbopath::{ + AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf, RelativeUnixPathBuf, + }; use turborepo_scm::{package_deps::GitHashes, SCM}; + use super::{FileHashes, HashState}; use crate::{ cookies::CookieWriter, hash_watcher::{HashSpec, HashWatcher}, @@ -529,10 +617,28 @@ mod tests { (tmp, repo, repo_root) } + fn create_fixture_branch(repo: &Repository, repo_root: &AbsoluteSystemPath) { + // create a branch that deletes bar-file and adds baz-file to the bar package + let bar_dir = repo_root.join_components(&["packages", "bar"]); + bar_dir.join_component("bar-file").remove().unwrap(); + bar_dir + .join_component("baz-file") + .create_with_contents("baz file contents") + .unwrap(); + let current_commit = repo + .head() + .ok() + .map(|r| r.peel_to_commit().unwrap()) + .unwrap(); + repo.branch("test-branch", ¤t_commit, false).unwrap(); + repo.set_head("refs/heads/test-branch").unwrap(); + commit_all(&repo); + } + #[tokio::test] #[tracing_test::traced_test] async fn test_basic_file_changes() { - let (_tmp, repo, repo_root) = setup_fixture(); + let (_tmp, _repo, repo_root) = setup_fixture(); let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); @@ -548,23 +654,25 @@ mod tests { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); let package_discovery = package_watcher.watch_discovery(); let hash_watcher = - HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), &scm); + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); let foo_path = repo_root.join_components(&["packages", "foo"]); - - let foo_hash = hash_watcher - .get_hash_blocking(HashSpec { + // We need to give filewatching time to do the initial scan, + // but this should resolve in short order to the expected value. + retry_get_hash( + &hash_watcher, + HashSpec { package_path: repo_root.anchor(&foo_path).unwrap(), inputs: None, - }) - .await - .unwrap(); - let expected = make_expected(vec![ - ("foo-file", "9317666a2e7b729b740c706ab79724952c97bde4"), - ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), - (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), - ]); - assert_eq!(foo_hash, expected); + }, + Duration::from_secs(2), + make_expected(vec![ + ("foo-file", "9317666a2e7b729b740c706ab79724952c97bde4"), + ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), + (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), + ]), + ) + .await; // update foo-file let foo_file_path = repo_root.join_components(&["packages", "foo", "foo-file"]); @@ -579,12 +687,128 @@ mod tests { }, Duration::from_secs(2), make_expected(vec![ - ("foo-file", "new-hash"), + ("foo-file", "5f6796bbd23dcdc9d30d07a2d8a4817c34b7f1e7"), ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), ]), ) .await; + + // update files in dist/ and out/ and foo-file + // verify we don't get hashes for the gitignored files + repo_root + .join_components(&["packages", "foo", "out", "some-file"]) + .create_with_contents("an ignored file") + .unwrap(); + repo_root + .join_components(&["packages", "foo", "dist", "some-other-file"]) + .create_with_contents("an ignored file") + .unwrap(); + foo_file_path + .create_with_contents("even more foo-file contents") + .unwrap(); + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&foo_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("foo-file", "0cb73634538618658f092cd7a3a373c243513a6a"), + ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), + (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), + ]), + ) + .await; + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_switch_branch() { + let (_tmp, repo, repo_root) = setup_fixture(); + + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + + let scm = SCM::new(&repo_root); + assert!(!scm.is_manual()); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + let package_discovery = package_watcher.watch_discovery(); + let hash_watcher = + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); + + let bar_path = repo_root.join_components(&["packages", "bar"]); + + // We need to give filewatching time to do the initial scan, + // but this should resolve in short order to the expected value. + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&bar_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("bar-file", "b9bdb1e4875f7397b3f68c104bc249de0ecd3f8e"), + ("package.json", "b39117e03f0dbe217b957f58a2ad78b993055088"), + ]), + ) + .await; + + create_fixture_branch(&repo, &repo_root); + + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&bar_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("baz-file", "a5395ccf1b8966f3ea805aff0851eac13acb3540"), + ("package.json", "b39117e03f0dbe217b957f58a2ad78b993055088"), + ]), + ) + .await; + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_non_existent_package() { + let (_tmp, _repo, repo_root) = setup_fixture(); + + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + + let scm = SCM::new(&repo_root); + assert!(!scm.is_manual()); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + let package_discovery = package_watcher.watch_discovery(); + let hash_watcher = + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); + + let non_existent_path = repo_root.join_components(&["packages", "non-existent"]); + let relative_non_existent_path = repo_root.anchor(&non_existent_path).unwrap(); + let result = hash_watcher + .get_file_hashes(HashSpec { + package_path: relative_non_existent_path.clone(), + inputs: None, + }) + .await; + assert_matches!(result, Err(crate::hash_watcher::Error::UnknownPackage(unknown_spec)) if unknown_spec.package_path == relative_non_existent_path); } // we don't have a signal for when hashing is complete after having made a file @@ -600,7 +824,7 @@ mod tests { let mut error = None; let mut last_value = None; while Instant::now() < deadline { - match hash_watcher.get_hash_blocking(spec.clone()).await { + match hash_watcher.get_file_hashes(spec.clone()).await { Ok(hashes) => { if hashes == expected { return; @@ -626,4 +850,42 @@ mod tests { } map } + + #[test] + fn test_file_hashes_ancestor() { + let mut hashes = FileHashes::new(); + + let root = AnchoredSystemPathBuf::try_from("").unwrap(); + let foo_path = root.join_components(&["apps", "foo"]); + let foo_spec = HashSpec { + package_path: foo_path.clone(), + inputs: None, + }; + hashes.insert(foo_spec, HashState::Hashes(GitHashes::new())); + let foo_bar_path = root.join_components(&["apps", "foobar"]); + let foo_bar_spec = HashSpec { + package_path: foo_bar_path.clone(), + inputs: None, + }; + hashes.insert(foo_bar_spec, HashState::Hashes(GitHashes::new())); + + let foo_candidate = foo_path.join_component("README.txt"); + let result = hashes.get_package_path(&foo_candidate).unwrap(); + assert_eq!(result, foo_path.deref()); + + let foo_bar_candidate = foo_bar_path.join_component("README.txt"); + let result = hashes.get_package_path(&foo_bar_candidate).unwrap(); + assert_eq!(result, foo_bar_path.deref()); + + // try a path that is a *sibling* of a package, but not itself a package + let sibling = root.join_components(&["apps", "sibling"]); + let result = hashes.get_package_path(&sibling); + assert!(result.is_none()); + + // try a path that is a *sibling* of a package, but not itself a package, but + // starts with the prefix of a package + let decoy = root.join_components(&["apps", "foodecoy"]); + let result = hashes.get_package_path(&decoy); + assert!(result.is_none()); + } } diff --git a/crates/turborepo-paths/src/anchored_system_path.rs b/crates/turborepo-paths/src/anchored_system_path.rs index c80d18f78bef4..31cd3b855d906 100644 --- a/crates/turborepo-paths/src/anchored_system_path.rs +++ b/crates/turborepo-paths/src/anchored_system_path.rs @@ -6,7 +6,7 @@ use serde::Serialize; use crate::{AnchoredSystemPathBuf, PathError, PathRelation, RelativeUnixPathBuf}; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Hash)] #[serde(transparent)] pub struct AnchoredSystemPath(Utf8Path); @@ -153,3 +153,30 @@ impl AnchoredSystemPath { } } } + +#[cfg(test)] +mod tests { + use test_case::test_case; + + use crate::{AnchoredSystemPathBuf, PathRelation}; + + #[test_case(&["a", "b"], &["a", "b"], PathRelation::Parent ; "equal paths return parent")] + #[test_case(&["a"], &["a", "b"], PathRelation::Parent ; "a is a parent of a/b")] + #[test_case(&["a", "b"], &["a"], PathRelation::Child ; "a/b is a child of a")] + #[test_case(&["a", "b"], &["a", "c"], PathRelation::Divergent ; "a/b and a/c are divergent")] + fn test_path_relation( + abs_path_components: &[&str], + other_components: &[&str], + expected: PathRelation, + ) { + let abs_path = AnchoredSystemPathBuf::try_from("") + .unwrap() + .join_components(abs_path_components); + let other_path = AnchoredSystemPathBuf::try_from("") + .unwrap() + .join_components(other_components); + + let relation = abs_path.relation_to_path(&other_path); + assert_eq!(relation, expected); + } +}