Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rs-notify implementation of fs::watch #9040

Merged
merged 7 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/extension/src/extension_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ impl ExtensionStore {
let reload_tx = this.reload_tx.clone();
let installed_dir = this.installed_dir.clone();
async move {
let mut events = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
while let Some(events) = events.next().await {
for event in events {
let Ok(event_path) = event.path.strip_prefix(&installed_dir) else {
let mut paths = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
while let Some(paths) = paths.next().await {
for path in paths {
let Ok(event_path) = path.strip_prefix(&installed_dir) else {
continue;
};

Expand Down
4 changes: 3 additions & 1 deletion crates/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ path = "src/fs.rs"

[dependencies]
collections.workspace = true
fsevent.workspace = true
rope.workspace = true
text.workspace = true
util.workspace = true
Expand All @@ -37,6 +36,9 @@ time.workspace = true

gpui = { workspace = true, optional = true }

[target.'cfg(target_os = "macos")'.dependencies]
fsevent.workspace = true

[target.'cfg(not(target_os = "macos"))'.dependencies]
notify = "6.1.1"

Expand Down
130 changes: 68 additions & 62 deletions crates/fs/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
pub mod repository;

use anyhow::{anyhow, Result};
pub use fsevent::Event;
#[cfg(target_os = "macos")]
use fsevent::EventStream;

#[cfg(not(target_os = "macos"))]
use fsevent::StreamFlags;

#[cfg(not(target_os = "macos"))]
use notify::{Config, EventKind, Watcher};

#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
Expand Down Expand Up @@ -76,7 +67,7 @@ pub trait Fs: Send + Sync {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>>;
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>;

fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<Mutex<dyn GitRepository>>>;
fn is_fake(&self) -> bool;
Expand Down Expand Up @@ -327,12 +318,18 @@ impl Fs for RealFs {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
use fsevent::EventStream;

let (tx, rx) = smol::channel::unbounded();
let (stream, handle) = EventStream::new(&[path], latency);
std::thread::spawn(move || {
stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
stream.run(move |events| {
smol::block_on(tx.send(events.into_iter().map(|event| event.path).collect()))
.is_ok()
});
});

Box::pin(rx.chain(futures::stream::once(async move {
drop(handle);
vec![]
Expand All @@ -343,49 +340,66 @@ impl Fs for RealFs {
async fn watch(
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
let (tx, rx) = smol::channel::unbounded();
_latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
use notify::{event::EventKind, Watcher};
// todo(linux): This spawns two threads, while the macOS impl
// only spawns one. Can we use a OnceLock or some such to make
// this better

if !path.exists() {
log::error!("watch path does not exist: {}", path.display());
return Box::pin(rx);
}
let (tx, rx) = smol::channel::unbounded();

let mut watcher =
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(event) => {
let flags = match event.kind {
// ITEM_REMOVED is currently the only flag we care about
EventKind::Remove(_) => StreamFlags::ITEM_REMOVED,
_ => StreamFlags::NONE,
};
let events = event
.paths
.into_iter()
.map(|path| Event {
event_id: 0,
flags,
path,
})
.collect::<Vec<_>>();
let _ = tx.try_send(events);
let mut file_watcher = notify::recommended_watcher({
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.log_err() {
tx.try_send(event.paths).ok();
}
Err(err) => {
log::error!("watch error: {}", err);
}
})
.unwrap();

watcher
.configure(Config::default().with_poll_interval(latency))
.unwrap();
}
})
.expect("Could not start file watcher");

watcher
file_watcher
.watch(path, notify::RecursiveMode::Recursive)
.unwrap();
.ok(); // It's ok if this fails, the parent watcher will add it.

let mut parent_watcher = notify::recommended_watcher({
let watched_path = path.to_path_buf();
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.ok() {
if event.paths.into_iter().any(|path| *path == watched_path) {
match event.kind {
EventKind::Create(_) => {
file_watcher
.watch(watched_path.as_path(), notify::RecursiveMode::Recursive)
.log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
EventKind::Remove(_) => {
file_watcher.unwatch(&watched_path).log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
_ => {}
}
}
}
}
})
.expect("Could not start file watcher");

parent_watcher
.watch(
path.parent()
.expect("Watching root is probably not what you want"),
notify::RecursiveMode::NonRecursive,
)
.expect("Could not start watcher on parent directory");

Box::pin(rx)
Box::pin(rx.chain(futures::stream::once(async move {
drop(parent_watcher);
vec![]
})))
}

fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<Mutex<dyn GitRepository>>> {
Expand Down Expand Up @@ -443,10 +457,6 @@ impl Fs for RealFs {
}
}

pub fn fs_events_paths(events: Vec<Event>) -> Vec<PathBuf> {
events.into_iter().map(|event| event.path).collect()
}

#[cfg(any(test, feature = "test-support"))]
pub struct FakeFs {
// Use an unfair lock to ensure tests are deterministic.
Expand All @@ -459,9 +469,9 @@ struct FakeFsState {
root: Arc<Mutex<FakeFsEntry>>,
next_inode: u64,
next_mtime: SystemTime,
event_txs: Vec<smol::channel::Sender<Vec<fsevent::Event>>>,
event_txs: Vec<smol::channel::Sender<Vec<PathBuf>>>,
events_paused: bool,
buffered_events: Vec<fsevent::Event>,
buffered_events: Vec<PathBuf>,
metadata_call_count: usize,
read_dir_call_count: usize,
}
Expand Down Expand Up @@ -569,11 +579,7 @@ impl FakeFsState {
T: Into<PathBuf>,
{
self.buffered_events
.extend(paths.into_iter().map(|path| fsevent::Event {
event_id: 0,
flags: fsevent::StreamFlags::empty(),
path: path.into(),
}));
.extend(paths.into_iter().map(Into::into));

if !self.events_paused {
self.flush_events(self.buffered_events.len());
Expand Down Expand Up @@ -1328,14 +1334,14 @@ impl Fs for FakeFs {
&self,
path: &Path,
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded();
self.state.lock().event_txs.push(tx);
let path = path.to_path_buf();
let executor = self.executor.clone();
Box::pin(futures::StreamExt::filter(rx, move |events| {
let result = events.iter().any(|event| event.path.starts_with(&path));
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;
Expand Down
Loading
Loading