Skip to content

Commit

Permalink
Add rs-notify implementation of fs::watch (zed-industries#9040)
Browse files Browse the repository at this point in the history
This PR simplifies the Zed file system abstraction and implements
`Fs::watch` for linux and windows.

TODO:
- [x] Figure out why this fails to initialize the file watchers when we
have to initialize the config directory paths, but succeeds on
subsequent runs.
- [x] Fix macOS dependencies on old fsevents::Event crate

Release Notes:

- N/A
  • Loading branch information
mikayla-maki authored Mar 9, 2024
1 parent 456efb5 commit ca696fd
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 493 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/extension/src/extension_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ impl ExtensionStore {
let reload_tx = this.reload_tx.clone();
let installed_dir = this.installed_dir.clone();
async move {
let mut events = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
while let Some(events) = events.next().await {
for event in events {
let Ok(event_path) = event.path.strip_prefix(&installed_dir) else {
let mut paths = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
while let Some(paths) = paths.next().await {
for path in paths {
let Ok(event_path) = path.strip_prefix(&installed_dir) else {
continue;
};

Expand Down
4 changes: 3 additions & 1 deletion crates/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ path = "src/fs.rs"

[dependencies]
collections.workspace = true
fsevent.workspace = true
rope.workspace = true
text.workspace = true
util.workspace = true
Expand All @@ -37,6 +36,9 @@ time.workspace = true

gpui = { workspace = true, optional = true }

[target.'cfg(target_os = "macos")'.dependencies]
fsevent.workspace = true

[target.'cfg(not(target_os = "macos"))'.dependencies]
notify = "6.1.1"

Expand Down
130 changes: 68 additions & 62 deletions crates/fs/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
pub mod repository;

use anyhow::{anyhow, Result};
pub use fsevent::Event;
#[cfg(target_os = "macos")]
use fsevent::EventStream;

#[cfg(not(target_os = "macos"))]
use fsevent::StreamFlags;

#[cfg(not(target_os = "macos"))]
use notify::{Config, EventKind, Watcher};

#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
Expand Down Expand Up @@ -76,7 +67,7 @@ pub trait Fs: Send + Sync {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>>;
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>;

fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<Mutex<dyn GitRepository>>>;
fn is_fake(&self) -> bool;
Expand Down Expand Up @@ -327,12 +318,18 @@ impl Fs for RealFs {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
use fsevent::EventStream;

let (tx, rx) = smol::channel::unbounded();
let (stream, handle) = EventStream::new(&[path], latency);
std::thread::spawn(move || {
stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
stream.run(move |events| {
smol::block_on(tx.send(events.into_iter().map(|event| event.path).collect()))
.is_ok()
});
});

Box::pin(rx.chain(futures::stream::once(async move {
drop(handle);
vec![]
Expand All @@ -343,49 +340,66 @@ impl Fs for RealFs {
async fn watch(
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
let (tx, rx) = smol::channel::unbounded();
_latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
use notify::{event::EventKind, Watcher};
// todo(linux): This spawns two threads, while the macOS impl
// only spawns one. Can we use a OnceLock or some such to make
// this better

if !path.exists() {
log::error!("watch path does not exist: {}", path.display());
return Box::pin(rx);
}
let (tx, rx) = smol::channel::unbounded();

let mut watcher =
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(event) => {
let flags = match event.kind {
// ITEM_REMOVED is currently the only flag we care about
EventKind::Remove(_) => StreamFlags::ITEM_REMOVED,
_ => StreamFlags::NONE,
};
let events = event
.paths
.into_iter()
.map(|path| Event {
event_id: 0,
flags,
path,
})
.collect::<Vec<_>>();
let _ = tx.try_send(events);
let mut file_watcher = notify::recommended_watcher({
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.log_err() {
tx.try_send(event.paths).ok();
}
Err(err) => {
log::error!("watch error: {}", err);
}
})
.unwrap();

watcher
.configure(Config::default().with_poll_interval(latency))
.unwrap();
}
})
.expect("Could not start file watcher");

watcher
file_watcher
.watch(path, notify::RecursiveMode::Recursive)
.unwrap();
.ok(); // It's ok if this fails, the parent watcher will add it.

let mut parent_watcher = notify::recommended_watcher({
let watched_path = path.to_path_buf();
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.ok() {
if event.paths.into_iter().any(|path| *path == watched_path) {
match event.kind {
EventKind::Create(_) => {
file_watcher
.watch(watched_path.as_path(), notify::RecursiveMode::Recursive)
.log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
EventKind::Remove(_) => {
file_watcher.unwatch(&watched_path).log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
_ => {}
}
}
}
}
})
.expect("Could not start file watcher");

parent_watcher
.watch(
path.parent()
.expect("Watching root is probably not what you want"),
notify::RecursiveMode::NonRecursive,
)
.expect("Could not start watcher on parent directory");

Box::pin(rx)
Box::pin(rx.chain(futures::stream::once(async move {
drop(parent_watcher);
vec![]
})))
}

fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<Mutex<dyn GitRepository>>> {
Expand Down Expand Up @@ -443,10 +457,6 @@ impl Fs for RealFs {
}
}

pub fn fs_events_paths(events: Vec<Event>) -> Vec<PathBuf> {
events.into_iter().map(|event| event.path).collect()
}

#[cfg(any(test, feature = "test-support"))]
pub struct FakeFs {
// Use an unfair lock to ensure tests are deterministic.
Expand All @@ -459,9 +469,9 @@ struct FakeFsState {
root: Arc<Mutex<FakeFsEntry>>,
next_inode: u64,
next_mtime: SystemTime,
event_txs: Vec<smol::channel::Sender<Vec<fsevent::Event>>>,
event_txs: Vec<smol::channel::Sender<Vec<PathBuf>>>,
events_paused: bool,
buffered_events: Vec<fsevent::Event>,
buffered_events: Vec<PathBuf>,
metadata_call_count: usize,
read_dir_call_count: usize,
}
Expand Down Expand Up @@ -569,11 +579,7 @@ impl FakeFsState {
T: Into<PathBuf>,
{
self.buffered_events
.extend(paths.into_iter().map(|path| fsevent::Event {
event_id: 0,
flags: fsevent::StreamFlags::empty(),
path: path.into(),
}));
.extend(paths.into_iter().map(Into::into));

if !self.events_paused {
self.flush_events(self.buffered_events.len());
Expand Down Expand Up @@ -1328,14 +1334,14 @@ impl Fs for FakeFs {
&self,
path: &Path,
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded();
self.state.lock().event_txs.push(tx);
let path = path.to_path_buf();
let executor = self.executor.clone();
Box::pin(futures::StreamExt::filter(rx, move |events| {
let result = events.iter().any(|event| event.path.starts_with(&path));
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;
Expand Down
Loading

0 comments on commit ca696fd

Please sign in to comment.