From ed8ff7233569ea016e963cf59ea96f81f4b407df Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 17 Sep 2024 11:30:35 -0400 Subject: [PATCH] chore(watch): simplify watch data synchronization (#9154) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description Basic view of the futures involved with running `watch` - The `events_fut` takes events from the package changes watcher and folds them into a `ChangedPackages` struct. It then triggers a notify. - The `run_fut` loops waiting for a notification and then performing a run. - Signal handler watching for `SIGINT`s This PR cleans up how packages changes are synchronized between futures: - Removes `RefCell` as `Mutex` already implies mutually exclusive access the the underlying resource - Remove `run_fut` holding onto the changed packages lock for the entirety of it's run. This allows us to switch from `tokio::sync::Mutex` to `std::sync::Mutex`. Which is suggested by the [`tokio::sync::Mutex` docs](https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use) ### Testing Instructions `rustc` + 👀 . Quick gut check by running `turbo watch` and triggering package changes --- crates/turborepo-lib/src/run/watch.rs | 34 +++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index f72bd2b0b0c8b..ce16c4c02979a 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -1,13 +1,13 @@ -use std::{cell::RefCell, collections::HashSet, sync::Arc}; +use std::{ + collections::HashSet, + ops::DerefMut as _, + sync::{Arc, Mutex}, +}; use futures::StreamExt; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; -use tokio::{ - select, - sync::{Mutex, Notify}, - task::JoinHandle, -}; +use tokio::{select, sync::Notify, task::JoinHandle}; use tracing::{instrument, trace}; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; @@ -172,14 +172,14 @@ impl WatchClient { // We explicitly use a tokio::sync::Mutex here to avoid deadlocks. // If we used a std::sync::Mutex, we could deadlock by spinning the lock // and not yielding back to the tokio runtime. - let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default())); + let changed_packages = Mutex::new(ChangedPackages::default()); let notify_run = Arc::new(Notify::new()); let notify_event = notify_run.clone(); let event_fut = async { while let Some(event) = events.next().await { let event = event?; - Self::handle_change_event(&changed_packages, event.event.unwrap()).await?; + Self::handle_change_event(&changed_packages, event.event.unwrap())?; notify_event.notify_one(); } @@ -189,9 +189,13 @@ impl WatchClient { let run_fut = async { loop { notify_run.notified().await; - let changed_packages_guard = changed_packages.lock().await; - if !changed_packages_guard.borrow().is_empty() { - let changed_packages = changed_packages_guard.take(); + let some_changed_packages = { + let mut changed_packages_guard = + changed_packages.lock().expect("poisoned lock"); + (!changed_packages_guard.is_empty()) + .then(|| std::mem::take(changed_packages_guard.deref_mut())) + }; + if let Some(changed_packages) = some_changed_packages { self.execute_run(changed_packages).await?; } } @@ -214,8 +218,8 @@ impl WatchClient { } #[instrument(skip(changed_packages))] - async fn handle_change_event( - changed_packages: &Mutex>, + fn handle_change_event( + changed_packages: &Mutex, event: proto::package_change_event::Event, ) -> Result<(), Error> { // Should we recover here? @@ -225,7 +229,7 @@ impl WatchClient { }) => { let package_name = PackageName::from(package_name); - match changed_packages.lock().await.get_mut() { + match changed_packages.lock().expect("poisoned lock").deref_mut() { ChangedPackages::All => { // If we've already changed all packages, ignore } @@ -235,7 +239,7 @@ impl WatchClient { } } proto::package_change_event::Event::RediscoverPackages(_) => { - *changed_packages.lock().await.get_mut() = ChangedPackages::All; + *changed_packages.lock().expect("poisoned lock") = ChangedPackages::All; } proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { return Err(DaemonError::Unavailable(message).into());