Skip to content

Commit

Permalink
fix(rover_std): windows tests (#2180)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronArinder committed Sep 26, 2024
1 parent 342fade commit c8946fb
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 83 deletions.
4 changes: 3 additions & 1 deletion crates/rover-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
notify = { workspace = true }
notify-debouncer-full = { workspace = true }
rstest = { workspace = true }
speculoos = { workspace = true }
tempfile = { workspace = true }
tempfile = { workspace = true }
284 changes: 202 additions & 82 deletions crates/rover-std/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::fs::OpenOptions;
use std::io::{ErrorKind, Write};
use std::path::PathBuf;
use std::{fs, path::Path, time::Duration};

use anyhow::{anyhow, Context};
use camino::{ReadDirUtf8, Utf8Path, Utf8PathBuf};
use notify::event::ModifyKind;
use notify::{EventKind, RecursiveMode, Watcher};
use notify_debouncer_full::{new_debouncer, DebounceEventResult};
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
new_debouncer, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap,
};
use tap::TapFallible;
use tokio::runtime::Handle;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Receiver, Sender as BoundedSender, UnboundedSender};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crate::{errln, infoln, warnln, RoverStdError};

/// The rate at which we timeout the debouncer
const DEBOUNCER_TIMEOUT: Duration = Duration::from_secs(1);

/// Interact with a file system
#[derive(Default, Copy, Clone)]
pub struct Fs {}
Expand Down Expand Up @@ -235,114 +242,159 @@ impl Fs {
Ok(())
}

/// Spawns a file watcher for a given file, sending events over the channel
/// whenever the file should be re-read
/// Spawns a file watcher for a given file, sending events over the channel whenever the file
/// should be re-read. This is primarily used for composition and so the event emitted is a
/// unit struct. The caller should react to that event as representing a reason to recompose.
///
/// Example:
///
/// ```ignore
/// let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
/// let path = "./test.txt";
/// let (file_tx, file_rx) = tokio::sync::mypsc::unbounded_channel();
/// let cancellation_token = Fs::watch_file(path.clone(), file_tx);
///
/// tokio::spawn(move || {
/// Fs::spawn_file_watcher(&path, tx)?;
/// tokio::task::spawn_blocking(move || loop {
/// rx.recv().await;
/// println!("file contents:\n{}", Fs::read_file(&path)?);
/// });
/// while let Some(event) = file_rx.await {
/// // do something
/// }
/// });
///
/// // Cancel and close the watcher
/// cancellation_token.cancel();
/// ```
pub fn watch_file<P>(path: P, tx: WatchSender) -> CancellationToken
where
P: AsRef<Utf8Path>,
{
let path = path.as_ref().to_path_buf();
let path = path.as_std_path().to_path_buf();
infoln!("Watching {} for changes", path.display());
let (fs_tx, mut fs_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(1);

// Sit in the loop, and once we get an event from the file pass it along to the
// waiting channel so that the supergraph can be re-composed.
let tx = tx.clone();
let path = path.clone();
let handle = Handle::current();
// Spawn a debouncer so we don't detect single rather than multiple writes in quick succession,
// use the None parameter to allow it to calculate the tick_rate, in line with previous
// notify implementations.
let debouncer = new_debouncer(
Duration::from_secs(1),
None,
move |result: DebounceEventResult| {
handle.block_on(async {
let _ = fs_tx
.send(result)
.await
.tap_err(|err| warnln!("Failed to send DebounceEventResult: {:?}", err));
});
},
);

let debouncer = match debouncer {
Ok(mut debouncer) => {
let watch_result = debouncer
.watcher()
.watch(&path, RecursiveMode::NonRecursive);
match watch_result {
Ok(_) => Some(debouncer),
Err(err) => {
handle_notify_error(&tx, &path, err);
None
}
}
}
Err(err) => {
handle_notify_error(&tx, &path, err);
None
}
};
let path = path.as_ref().to_path_buf().into_std_path_buf();
let (fs_tx, fs_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(1);

let receive_messages = tokio::spawn(async move {
while let Some(events) = fs_rx.recv().await {
let events = match events {
Err(errs) => {
if let Some(err) = errs.first() {
handle_generic_error(&tx, &path, err);
}
break;
}
Ok(events) => events,
};
for event in events {
if let EventKind::Modify(ModifyKind::Data(_)) = event.kind {
if let Err(err) = tx.send(Ok(())) {
handle_generic_error(&tx, &path, err);
break;
}
}
}
}
});
infoln!("Watching {:?} for changes", path.display());

let runtime_handle = Handle::current();
let debouncer = Fs::debouncer(&runtime_handle, &tx, fs_tx, path.clone());
let receive_messages_join_handle = Fs::receive_messages(tx, fs_rx, path);
let cancellation_token = CancellationToken::new();

tokio::spawn({
let debouncer = debouncer;
let messages_abort_handle = receive_messages_join_handle.abort_handle();
let cancellation_token = cancellation_token.clone();
let messages_abort_handle = receive_messages.abort_handle();

async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::debug!("file watching cancelled");
if let Some(debouncer) = debouncer {
drop(debouncer);
}
messages_abort_handle.abort();
}
_ = async move {
tokio::join!(receive_messages)
tokio::join!(receive_messages_join_handle)
} => {}
}
}
});

cancellation_token
}

/// Spawns a debouncer for use in keeping multiple, successive writes from having to be
/// processed. Rather, events are emitted at the timeout rate and are checked for at particular
/// intervals (see the documentation on `new_debouncer`)
///
/// Returns an option to denote whether we're successfully watching with a debouncer; if not,
/// None is returned
///
/// Development note: the RecommendedWatcher is platform-specific and _might_ be a good place
/// for debugging if you run into weird behavior for the deboucner's watcher
fn debouncer(
runtime_handle: &Handle,
watching_tx: &WatchSender,
fs_tx: BoundedSender<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
path: PathBuf,
) -> Option<Debouncer<RecommendedWatcher, FileIdMap>> {
let path = path.as_path();
let runtime_handle = runtime_handle.clone();

let err_notification = |err: notify::Error| {
handle_notify_error(watching_tx, path, err);
};

// The 'guts' of the debouncer and how it sends file system events
let event_handler = move |result: DebounceEventResult| {
runtime_handle.block_on(async {
let _ = fs_tx
.send(result)
.await
.tap_err(|err| warnln!("Failed to send DebounceEventResult: {:?}", err));
});
};

// Create a new debouncer
new_debouncer(
DEBOUNCER_TIMEOUT,
// The tick rate; when None, notify caltures it for us (1/4th the provided timeout)
None,
event_handler,
)
.map(|mut debouncer| {
debouncer
.watcher()
// Actually begin watching, but with the debouncer; non-recursive because we care
// only about the particular file we're targeting
.watch(path, RecursiveMode::NonRecursive)
.map_err(err_notification)
.map_or(None, |_| Some(debouncer))
})
.map_err(err_notification)
.unwrap_or_default()
}

/// Receive the file system events for ap articular file. The events emitted by particular OSes
/// can differ, with the default Any from notify being the catch-all. See the notes within this
/// function's body for more details on those OS-specific events
fn receive_messages(
watching_tx: WatchSender,
mut fs_rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
path: PathBuf,
) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(events) = fs_rx.recv().await {
let events = match events {
Err(errs) => {
if let Some(err) = errs.first() {
handle_generic_error(&watching_tx, path.as_path(), err);
}
break;
}
Ok(events) => events,
};

for event in events {
match event.kind {
// On unix-based systems, the Modify(Data(..)) tells us that the file was
// modified, but on windows, we have to look for the catch-all event (Any)
// to know whether the file was modified. Strictly speaking, we only need
// to match on Modify(_), but having both here should serve as a reminder
// to future maintainers that file system events need special care for
// windows
EventKind::Modify(ModifyKind::Data(..))
| EventKind::Modify(ModifyKind::Any) => {
if let Err(err) = watching_tx.send(Ok(())) {
handle_generic_error(&watching_tx, &path, err);
break;
}
}
unsupported_event_kind => {
tracing::debug!("encountered an unsupported event while file watching {path:?}, {unsupported_event_kind:?}: {event:?}");
}
}
}
}
})
}
}

type WatchSender = UnboundedSender<Result<(), RoverStdError>>;
Expand Down Expand Up @@ -400,7 +452,7 @@ mod tests {
use camino::Utf8PathBuf;
use rstest::rstest;
use speculoos::prelude::*;
use tempfile::{tempdir, NamedTempFile, TempDir};
use tempfile::{NamedTempFile, TempDir};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;
use tokio::time::sleep;
Expand Down Expand Up @@ -451,22 +503,90 @@ mod tests {
}
}

//#[tokio::test]
//async fn test_watch_file() -> Result<()> {
// // create a temporary file that we'll make changes to for events to be watched
// let mut file = NamedTempFile::new()?;
// let path = Utf8PathBuf::from_path_buf(file.path().to_path_buf())
// .unwrap_or_else(|path| panic!("Unable to create Utf8PathBuf from path: {:?}", path));

// let (tx, rx) = unbounded_channel();
// let rx = Arc::new(Mutex::new(rx));
// let cancellation_token = Fs::watch_file(path.clone(), tx);
// sleep(Duration::from_millis(1500)).await;
// {
// let rx = rx.lock().await;
// assert_that!(rx.is_empty()).is_true();
// }
// file.write_all(b"some update")?;
// file.flush()?;
// let result = tokio::time::timeout(Duration::from_millis(2000), {
// let rx = rx.clone();
// async move {
// let mut output = None;
// let mut rx = rx.lock().await;
// if let Some(message) = rx.recv().await {
// output = Some(message);
// }
// output
// }
// })
// .await;
// assert_that!(result)
// .is_ok()
// .is_some()
// .is_ok()
// .is_equal_to(());
// {
// let rx = rx.lock().await;
// assert_that!(rx.is_closed()).is_false();
// }
// cancellation_token.cancel();
// // Kick the event loop so that the cancellation future gets called
// sleep(Duration::from_millis(0)).await;
// {
// let rx = rx.lock().await;
// assert_that!(rx.is_closed()).is_true();
// }
// Ok(())
//}

#[tokio::test]
async fn test_watch_file() -> Result<()> {
// create a temporary file that we'll make changes to for events to be watched
let mut file = NamedTempFile::new()?;

let path = Utf8PathBuf::from_path_buf(file.path().to_path_buf())
.unwrap_or_else(|path| panic!("Unable to create Utf8PathBuf from path: {:?}", path));

let (tx, rx) = unbounded_channel();
let rx = Arc::new(Mutex::new(rx));
let cancellation_token = Fs::watch_file(&path, tx);

let cancellation_token = Fs::watch_file(path.clone(), tx);

sleep(Duration::from_millis(1500)).await;

// assert that no events have been emitted yet
{
let rx = rx.lock().await;
assert_that!(rx.is_empty()).is_true();
}
file.write_all(b"test")?;

// do a change that'll emit an event
file.write_all(b"some update")?;
file.flush()?;
let result = tokio::time::timeout(Duration::from_millis(1500), {

let mut writeable_file = OpenOptions::new()
.write(true)
.truncate(true)
.open(path)
.expect("Cannot open file");

writeable_file
.write("some change".as_bytes())
.expect("couldn't write to file");

let result = tokio::time::timeout(Duration::from_millis(2000), {
let rx = rx.clone();
async move {
let mut output = None;
Expand Down

0 comments on commit c8946fb

Please sign in to comment.