Skip to content

Commit

Permalink
only use recursive watchers on macOS and windows (#4100)
Browse files Browse the repository at this point in the history
### Description

Only windows and macOS support real recursive file watchers, other OS
emulate it by walking the directory structure and watching all
directories. But that might be really slow and use up a lot of watchers,
so we don't want that. Instead we know exactly which directories are
used and can watch selectively directories when they are accessed.

* only use recursive watchers on macOS and windows
* fallback to non-recursively watch read directories on other OS

Note that this implementation still has some bugs when renaming folders,
but we can probably figure out these edge cases later...
  • Loading branch information
sokra authored Mar 7, 2023
1 parent 82920e7 commit c0bb681
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ auto-hash-map = { workspace = true }
bitflags = "1.3.2"
bytes = "1.1.0"
concurrent-queue = { workspace = true }
dashmap = { workspace = true }
dunce = { workspace = true }
futures = { workspace = true }
futures-retry = { workspace = true }
Expand Down
138 changes: 120 additions & 18 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
path::{Path, PathBuf, MAIN_SEPARATOR},
sync::{
mpsc::{channel, RecvError, TryRecvError},
Arc, Mutex,
Arc, Mutex, MutexGuard,
},
time::Duration,
};
Expand Down Expand Up @@ -80,6 +80,64 @@ pub trait FileSystem: ValueToString {
fn metadata(&self, fs_path: FileSystemPathVc) -> FileMetaVc;
}

#[derive(Default)]
struct DiskWatcher {
watcher: Mutex<Option<RecommendedWatcher>>,
/// Keeps track of which directories are currently watched. This is only
/// used on a OS that doesn't support recursive watching.
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
watching: dashmap::DashSet<PathBuf>,
}

impl DiskWatcher {
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn restore_if_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> {
if self.watching.contains(dir_path) {
let mut watcher = self.watcher.lock().unwrap();
self.start_watching(&mut watcher, dir_path, &root_path)?;
}
Ok(())
}

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn ensure_watching(&self, dir_path: &Path, root_path: &Path) -> Result<()> {
if self.watching.contains(dir_path) {
return Ok(());
}
let mut watcher = self.watcher.lock().unwrap();
if self.watching.insert(dir_path.to_path_buf()) {
self.start_watching(&mut watcher, dir_path, root_path)?;
}
Ok(())
}

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn start_watching(
&self,
watcher: &mut MutexGuard<Option<RecommendedWatcher>>,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
if let Some(watcher) = watcher.as_mut() {
let mut path = dir_path;
while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) {
if path == root_path {
return Err(err).context(format!(
"Unable to watch {} (tried up to {})",
dir_path.display(),
path.display()
));
}
let Some(parent_path) = path.parent() else {
return Err(err).context(format!("Unable to watch {} (tried up to {})", dir_path.display(), path.display()));
};
path = parent_path;
}
}
Ok(())
}
}

#[turbo_tasks::value(cell = "new", eq = "manual")]
pub struct DiskFileSystem {
pub name: String,
Expand All @@ -93,20 +151,36 @@ pub struct DiskFileSystem {
dir_invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
#[serde(skip)]
watcher: Mutex<Option<RecommendedWatcher>>,
watcher: Arc<DiskWatcher>,
}

impl DiskFileSystem {
/// Returns the root as Path
fn root_path(&self) -> &Path {
simplified(Path::new(&self.root))
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_invalidator(&self, path: impl AsRef<Path>, file: bool) {
fn register_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
if file {
self.invalidator_map.insert(path_to_key(path), invalidator);
} else {
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
self.invalidator_map.insert(path_to_key(path), invalidator);
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
if let Some(dir) = path.parent() {
self.watcher.ensure_watching(dir, self.root_path())?;
}
Ok(())
}

/// registers the path as an invalidator for the current task,
/// has to be called within a turbo-tasks function
fn register_dir_invalidator(&self, path: &Path) -> Result<()> {
let invalidator = turbo_tasks::get_invalidator();
self.dir_invalidator_map
.insert(path_to_key(path), invalidator);
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
self.watcher.ensure_watching(path, self.root_path())?;
Ok(())
}

pub fn invalidate(&self) {
Expand All @@ -119,7 +193,7 @@ impl DiskFileSystem {
}

pub fn start_watching(&self) -> Result<()> {
let mut watcher_guard = self.watcher.lock().unwrap();
let mut watcher_guard = self.watcher.watcher.lock().unwrap();
if watcher_guard.is_some() {
return Ok(());
}
Expand All @@ -133,7 +207,12 @@ impl DiskFileSystem {
let mut watcher = watcher(tx, Duration::from_millis(1))?;
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
#[cfg(any(target_os = "macos", target_os = "windows"))]
watcher.watch(&root, RecursiveMode::Recursive)?;
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
for dir_path in self.watcher.watching.iter() {
watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?;
}

// We need to invalidate all reads that happened before watching
// Best is to start_watching before starting to read
Expand All @@ -145,12 +224,18 @@ impl DiskFileSystem {
}

watcher_guard.replace(watcher);
drop(watcher_guard);

let disk_watcher = self.watcher.clone();
let root_path = self.root_path().to_path_buf();

spawn_thread(move || {
let mut batched_invalidate_path = HashSet::new();
let mut batched_invalidate_path_dir = HashSet::new();
let mut batched_invalidate_path_and_children = HashSet::new();
let mut batched_invalidate_path_and_children_dir = HashSet::new();
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
let mut batched_new_paths = HashSet::new();

'outer: loop {
let mut event = rx.recv().map_err(|e| match e {
Expand All @@ -161,7 +246,16 @@ impl DiskFileSystem {
Ok(DebouncedEvent::Write(path)) => {
batched_invalidate_path.insert(path);
}
Ok(DebouncedEvent::Create(path)) | Ok(DebouncedEvent::Remove(path)) => {
Ok(DebouncedEvent::Create(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(path.clone());
}
Ok(DebouncedEvent::Remove(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
Expand All @@ -177,6 +271,8 @@ impl DiskFileSystem {
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
}
Ok(DebouncedEvent::Rescan) => {
batched_invalidate_path_and_children.insert(PathBuf::from(&root));
Expand Down Expand Up @@ -257,21 +353,27 @@ impl DiskFileSystem {
&mut batched_invalidate_path_and_children_dir,
);
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
{
for path in batched_new_paths.drain() {
let _ = disk_watcher.restore_if_watching(&path, &root_path);
}
}
}
});
Ok(())
}

pub fn stop_watching(&self) {
if let Some(watcher) = self.watcher.lock().unwrap().take() {
if let Some(watcher) = self.watcher.watcher.lock().unwrap().take() {
drop(watcher);
// thread will detect the stop because the channel is disconnected
}
}

pub async fn to_sys_path(&self, fs_path: FileSystemPathVc) -> Result<PathBuf> {
// just in case there's a windows unc path prefix we remove it with `dunce`
let path = simplified(Path::new(&self.root));
let path = self.root_path();
let fs_path = fs_path.await?;
Ok(if fs_path.path.is_empty() {
path.to_path_buf()
Expand Down Expand Up @@ -299,7 +401,7 @@ impl DiskFileSystemVc {
mutex_map: Default::default(),
invalidator_map: Arc::new(InvalidatorMap::new()),
dir_invalidator_map: Arc::new(InvalidatorMap::new()),
watcher: Mutex::new(None),
watcher: Default::default(),
};

Ok(Self::cell(instance))
Expand Down Expand Up @@ -329,7 +431,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read(&self, fs_path: FileSystemPathVc) -> Result<FileContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let content = read_file(full_path, &self.mutex_map).await?;
Ok(content.cell())
Expand All @@ -338,7 +440,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read_dir(&self, fs_path: FileSystemPathVc) -> Result<DirectoryContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, false);
self.register_dir_invalidator(&full_path)?;
let fs_path = fs_path.await?;

// we use the sync std function here as it's a lot faster (600%) in
Expand Down Expand Up @@ -392,7 +494,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn read_link(&self, fs_path: FileSystemPathVc) -> Result<LinkContentVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let _lock = self.mutex_map.lock(full_path.clone()).await;
let link_path = match retry_future(|| fs::read_link(&full_path)).await {
Expand Down Expand Up @@ -474,7 +576,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn track(&self, fs_path: FileSystemPathVc) -> Result<CompletionVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(full_path, true);
self.register_invalidator(&full_path)?;
Ok(CompletionVc::new())
}

Expand Down Expand Up @@ -629,7 +731,7 @@ impl FileSystem for DiskFileSystem {
#[turbo_tasks::function]
async fn metadata(&self, fs_path: FileSystemPathVc) -> Result<FileMetaVc> {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);
self.register_invalidator(&full_path)?;

let _lock = self.mutex_map.lock(full_path.clone()).await;
let meta = retry_future(|| fs::metadata(full_path.clone()))
Expand Down

0 comments on commit c0bb681

Please sign in to comment.