From c0bb6816ac92d53d95013780e374acba17b719b2 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Mar 2023 19:32:41 +0100 Subject: [PATCH] only use recursive watchers on macOS and windows (#4100) ### Description Only windows and macOS support real recursive file watchers, other OS emulate it by walking the directory structure and watching all directories. But that might be really slow and use up a lot of watchers, so we don't want that. Instead we know exactly which directories are used and can watch selectively directories when they are accessed. * only use recursive watchers on macOS and windows * fallback to non-recursively watch read directories on other OS Note that this implementation still has some bugs when renaming folders, but we can probably figure out these edge cases later... --- Cargo.lock | 1 + crates/turbo-tasks-fs/Cargo.toml | 1 + crates/turbo-tasks-fs/src/lib.rs | 138 +++++++++++++++++++++++++++---- 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33667c781833a..1f7917909ba63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7451,6 +7451,7 @@ dependencies = [ "bytes", "concurrent-queue", "criterion", + "dashmap", "dunce", "futures", "futures-retry", diff --git a/crates/turbo-tasks-fs/Cargo.toml b/crates/turbo-tasks-fs/Cargo.toml index 8ec5735e09472..33f00d10481e3 100644 --- a/crates/turbo-tasks-fs/Cargo.toml +++ b/crates/turbo-tasks-fs/Cargo.toml @@ -18,6 +18,7 @@ auto-hash-map = { workspace = true } bitflags = "1.3.2" bytes = "1.1.0" concurrent-queue = { workspace = true } +dashmap = { workspace = true } dunce = { workspace = true } futures = { workspace = true } futures-retry = { workspace = true } diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index a5e5b5517efae..1b97d68e03bfa 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -29,7 +29,7 @@ use std::{ path::{Path, PathBuf, MAIN_SEPARATOR}, sync::{ mpsc::{channel, RecvError, TryRecvError}, - Arc, Mutex, + Arc, Mutex, MutexGuard, }, time::Duration, }; @@ -80,6 +80,64 @@ pub trait FileSystem: ValueToString { fn metadata(&self, fs_path: FileSystemPathVc) -> FileMetaVc; } +#[derive(Default)] +struct DiskWatcher { + watcher: Mutex>, + /// Keeps track of which directories are currently watched. This is only + /// used on a OS that doesn't support recursive watching. + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + watching: dashmap::DashSet, +} + +impl DiskWatcher { + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + if self.watching.contains(dir_path) { + let mut watcher = self.watcher.lock().unwrap(); + self.start_watching(&mut watcher, dir_path, &root_path)?; + } + Ok(()) + } + + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> { + if self.watching.contains(dir_path) { + return Ok(()); + } + let mut watcher = self.watcher.lock().unwrap(); + if self.watching.insert(dir_path.to_path_buf()) { + self.start_watching(&mut watcher, dir_path, root_path)?; + } + Ok(()) + } + + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + fn start_watching( + &self, + watcher: &mut MutexGuard>, + dir_path: &Path, + root_path: &Path, + ) -> Result<()> { + if let Some(watcher) = watcher.as_mut() { + let mut path = dir_path; + while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) { + if path == root_path { + return Err(err).context(format!( + "Unable to watch {} (tried up to {})", + dir_path.display(), + path.display() + )); + } + let Some(parent_path) = path.parent() else { + return Err(err).context(format!("Unable to watch {} (tried up to {})", dir_path.display(), path.display())); + }; + path = parent_path; + } + } + Ok(()) + } +} + #[turbo_tasks::value(cell = "new", eq = "manual")] pub struct DiskFileSystem { pub name: String, @@ -93,20 +151,36 @@ pub struct DiskFileSystem { dir_invalidator_map: Arc, #[turbo_tasks(debug_ignore, trace_ignore)] #[serde(skip)] - watcher: Mutex>, + watcher: Arc, } impl DiskFileSystem { + /// Returns the root as Path + fn root_path(&self) -> &Path { + simplified(Path::new(&self.root)) + } + /// registers the path as an invalidator for the current task, /// has to be called within a turbo-tasks function - fn register_invalidator(&self, path: impl AsRef, file: bool) { + fn register_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); - if file { - self.invalidator_map.insert(path_to_key(path), invalidator); - } else { - self.dir_invalidator_map - .insert(path_to_key(path), invalidator); + self.invalidator_map.insert(path_to_key(path), invalidator); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + if let Some(dir) = path.parent() { + self.watcher.ensure_watching(dir, self.root_path())?; } + Ok(()) + } + + /// registers the path as an invalidator for the current task, + /// has to be called within a turbo-tasks function + fn register_dir_invalidator(&self, path: &Path) -> Result<()> { + let invalidator = turbo_tasks::get_invalidator(); + self.dir_invalidator_map + .insert(path_to_key(path), invalidator); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + self.watcher.ensure_watching(path, self.root_path())?; + Ok(()) } pub fn invalidate(&self) { @@ -119,7 +193,7 @@ impl DiskFileSystem { } pub fn start_watching(&self) -> Result<()> { - let mut watcher_guard = self.watcher.lock().unwrap(); + let mut watcher_guard = self.watcher.watcher.lock().unwrap(); if watcher_guard.is_some() { return Ok(()); } @@ -133,7 +207,12 @@ impl DiskFileSystem { let mut watcher = watcher(tx, Duration::from_millis(1))?; // Add a path to be watched. All files and directories at that path and // below will be monitored for changes. + #[cfg(any(target_os = "macos", target_os = "windows"))] watcher.watch(&root, RecursiveMode::Recursive)?; + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + for dir_path in self.watcher.watching.iter() { + watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?; + } // We need to invalidate all reads that happened before watching // Best is to start_watching before starting to read @@ -145,12 +224,18 @@ impl DiskFileSystem { } watcher_guard.replace(watcher); + drop(watcher_guard); + + let disk_watcher = self.watcher.clone(); + let root_path = self.root_path().to_path_buf(); spawn_thread(move || { let mut batched_invalidate_path = HashSet::new(); let mut batched_invalidate_path_dir = HashSet::new(); let mut batched_invalidate_path_and_children = HashSet::new(); let mut batched_invalidate_path_and_children_dir = HashSet::new(); + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + let mut batched_new_paths = HashSet::new(); 'outer: loop { let mut event = rx.recv().map_err(|e| match e { @@ -161,7 +246,16 @@ impl DiskFileSystem { Ok(DebouncedEvent::Write(path)) => { batched_invalidate_path.insert(path); } - Ok(DebouncedEvent::Create(path)) | Ok(DebouncedEvent::Remove(path)) => { + Ok(DebouncedEvent::Create(path)) => { + batched_invalidate_path_and_children.insert(path.clone()); + batched_invalidate_path_and_children_dir.insert(path.clone()); + if let Some(parent) = path.parent() { + batched_invalidate_path_dir.insert(PathBuf::from(parent)); + } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + batched_new_paths.insert(path.clone()); + } + Ok(DebouncedEvent::Remove(path)) => { batched_invalidate_path_and_children.insert(path.clone()); batched_invalidate_path_and_children_dir.insert(path.clone()); if let Some(parent) = path.parent() { @@ -177,6 +271,8 @@ impl DiskFileSystem { if let Some(parent) = destination.parent() { batched_invalidate_path_dir.insert(PathBuf::from(parent)); } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + batched_new_paths.insert(destination.clone()); } Ok(DebouncedEvent::Rescan) => { batched_invalidate_path_and_children.insert(PathBuf::from(&root)); @@ -257,13 +353,19 @@ impl DiskFileSystem { &mut batched_invalidate_path_and_children_dir, ); } + #[cfg(not(any(target_os = "macos", target_os = "windows")))] + { + for path in batched_new_paths.drain() { + let _ = disk_watcher.restore_if_watching(&path, &root_path); + } + } } }); Ok(()) } pub fn stop_watching(&self) { - if let Some(watcher) = self.watcher.lock().unwrap().take() { + if let Some(watcher) = self.watcher.watcher.lock().unwrap().take() { drop(watcher); // thread will detect the stop because the channel is disconnected } @@ -271,7 +373,7 @@ impl DiskFileSystem { pub async fn to_sys_path(&self, fs_path: FileSystemPathVc) -> Result { // just in case there's a windows unc path prefix we remove it with `dunce` - let path = simplified(Path::new(&self.root)); + let path = self.root_path(); let fs_path = fs_path.await?; Ok(if fs_path.path.is_empty() { path.to_path_buf() @@ -299,7 +401,7 @@ impl DiskFileSystemVc { mutex_map: Default::default(), invalidator_map: Arc::new(InvalidatorMap::new()), dir_invalidator_map: Arc::new(InvalidatorMap::new()), - watcher: Mutex::new(None), + watcher: Default::default(), }; Ok(Self::cell(instance)) @@ -329,7 +431,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let content = read_file(full_path, &self.mutex_map).await?; Ok(content.cell()) @@ -338,7 +440,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read_dir(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, false); + self.register_dir_invalidator(&full_path)?; let fs_path = fs_path.await?; // we use the sync std function here as it's a lot faster (600%) in @@ -392,7 +494,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn read_link(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let _lock = self.mutex_map.lock(full_path.clone()).await; let link_path = match retry_future(|| fs::read_link(&full_path)).await { @@ -474,7 +576,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn track(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(full_path, true); + self.register_invalidator(&full_path)?; Ok(CompletionVc::new()) } @@ -629,7 +731,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function] async fn metadata(&self, fs_path: FileSystemPathVc) -> Result { let full_path = self.to_sys_path(fs_path).await?; - self.register_invalidator(&full_path, true); + self.register_invalidator(&full_path)?; let _lock = self.mutex_map.lock(full_path.clone()).await; let meta = retry_future(|| fs::metadata(full_path.clone()))