Skip to content

Commit

Permalink
chore(watch): simplify watch data synchronization (#9154)
Browse files Browse the repository at this point in the history
### Description

Basic view of the futures involved with running `watch`

- The `events_fut` takes events from the package changes watcher and
folds them into a `ChangedPackages` struct. It then triggers a notify.
- The `run_fut` loops waiting for a notification and then performing a
run.
 - Signal handler watching for `SIGINT`s

This PR cleans up how packages changes are synchronized between futures:
- Removes `RefCell` as `Mutex` already implies mutually exclusive access
the the underlying resource
- Remove `run_fut` holding onto the changed packages lock for the
entirety of it's run. This allows us to switch from `tokio::sync::Mutex`
to `std::sync::Mutex`. Which is suggested by the [`tokio::sync::Mutex`
docs](https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use)

### Testing Instructions

`rustc` + 👀 . Quick gut check by running `turbo watch` and triggering
package changes
  • Loading branch information
chris-olszewski committed Sep 17, 2024
1 parent 2947eaa commit ed8ff72
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{cell::RefCell, collections::HashSet, sync::Arc};
use std::{
collections::HashSet,
ops::DerefMut as _,
sync::{Arc, Mutex},
};

use futures::StreamExt;
use miette::{Diagnostic, SourceSpan};
use thiserror::Error;
use tokio::{
select,
sync::{Mutex, Notify},
task::JoinHandle,
};
use tokio::{select, sync::Notify, task::JoinHandle};
use tracing::{instrument, trace};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand Down Expand Up @@ -172,14 +172,14 @@ impl WatchClient {
// We explicitly use a tokio::sync::Mutex here to avoid deadlocks.
// If we used a std::sync::Mutex, we could deadlock by spinning the lock
// and not yielding back to the tokio runtime.
let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default()));
let changed_packages = Mutex::new(ChangedPackages::default());
let notify_run = Arc::new(Notify::new());
let notify_event = notify_run.clone();

let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Self::handle_change_event(&changed_packages, event.event.unwrap()).await?;
Self::handle_change_event(&changed_packages, event.event.unwrap())?;
notify_event.notify_one();
}

Expand All @@ -189,9 +189,13 @@ impl WatchClient {
let run_fut = async {
loop {
notify_run.notified().await;
let changed_packages_guard = changed_packages.lock().await;
if !changed_packages_guard.borrow().is_empty() {
let changed_packages = changed_packages_guard.take();
let some_changed_packages = {
let mut changed_packages_guard =
changed_packages.lock().expect("poisoned lock");
(!changed_packages_guard.is_empty())
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};
if let Some(changed_packages) = some_changed_packages {
self.execute_run(changed_packages).await?;
}
}
Expand All @@ -214,8 +218,8 @@ impl WatchClient {
}

#[instrument(skip(changed_packages))]
async fn handle_change_event(
changed_packages: &Mutex<RefCell<ChangedPackages>>,
fn handle_change_event(
changed_packages: &Mutex<ChangedPackages>,
event: proto::package_change_event::Event,
) -> Result<(), Error> {
// Should we recover here?
Expand All @@ -225,7 +229,7 @@ impl WatchClient {
}) => {
let package_name = PackageName::from(package_name);

match changed_packages.lock().await.get_mut() {
match changed_packages.lock().expect("poisoned lock").deref_mut() {
ChangedPackages::All => {
// If we've already changed all packages, ignore
}
Expand All @@ -235,7 +239,7 @@ impl WatchClient {
}
}
proto::package_change_event::Event::RediscoverPackages(_) => {
*changed_packages.lock().await.get_mut() = ChangedPackages::All;
*changed_packages.lock().expect("poisoned lock") = ChangedPackages::All;
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
return Err(DaemonError::Unavailable(message).into());
Expand Down

0 comments on commit ed8ff72

Please sign in to comment.