Skip to content

Commit

Permalink
fix(turborepo): Watch mode not responding to changes (#8057)
Browse files Browse the repository at this point in the history
### Description

When I converted to a `watch` channel, I ended up introducing an annoying bug. Basically you can't tell if the value has been read already in a `watch` channel. So if you send a rediscover, then with subsequent events it's impossible to determine if the rediscover has been seen and you should send a new event, or if it's not seen and therefore you should send nothing.

I fixed this by reverting back to the lock version and addressed @gsoltis's comments by using a tokio Mutex which yields to the runtime when grabbing the lock.

### Testing Instructions

Validated that we get the right change events. Also validated that with regular Mutex and no `yield_now`, we get a deadlock, while with tokio Mutex and no `yield_now` we don't deadlock.


Closes TURBO-2907
  • Loading branch information
NicholasLYang authored Apr 30, 2024
1 parent a82c8d3 commit b70ba36
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::HashSet;
use std::{cell::RefCell, collections::HashSet, sync::Arc};

use futures::StreamExt;
use miette::{Diagnostic, SourceSpan};
use thiserror::Error;
use tokio::{
select,
sync::watch,
task::{yield_now, JoinHandle},
sync::{Mutex, Notify},
task::JoinHandle,
};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand All @@ -22,8 +22,7 @@ use crate::{
DaemonConnector, DaemonPaths,
};

#[derive(Clone)]
pub enum ChangedPackages {
enum ChangedPackages {
All,
Some(HashSet<PackageName>),
}
Expand Down Expand Up @@ -89,10 +88,6 @@ pub enum Error {
SignalInterrupt,
#[error("package change error")]
PackageChange(#[from] tonic::Status),
#[error("changed packages channel closed, cannot receive new changes")]
ChangedPackagesRecv(#[from] watch::error::RecvError),
#[error("changed packages channel closed, cannot send new changes")]
ChangedPackagesSend(#[from] watch::error::SendError<ChangedPackages>),
}

impl WatchClient {
Expand Down Expand Up @@ -140,23 +135,31 @@ impl WatchClient {

let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?;

let (changed_pkgs_tx, mut changed_pkgs_rx) = watch::channel(ChangedPackages::default());
// We explicitly use a tokio::sync::Mutex here to avoid deadlocks.
// If we used a std::sync::Mutex, we could deadlock by spinning the lock
// and not yielding back to the tokio runtime.
let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default()));
let notify_run = Arc::new(Notify::new());
let notify_event = notify_run.clone();

let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Self::handle_change_event(&changed_pkgs_tx, event.event.unwrap()).await?;
Self::handle_change_event(&changed_packages, event.event.unwrap()).await?;
notify_event.notify_one();
}

Err(Error::ConnectionClosed)
};

let run_fut = async {
loop {
changed_pkgs_rx.changed().await?;
let changed_pkgs = { changed_pkgs_rx.borrow_and_update().clone() };

self.execute_run(changed_pkgs).await?;
notify_run.notified().await;
let changed_packages_guard = changed_packages.lock().await;
if !changed_packages_guard.borrow().is_empty() {
let changed_packages = changed_packages_guard.take();
self.execute_run(changed_packages).await?;
}
}
};

Expand All @@ -177,7 +180,7 @@ impl WatchClient {
}

async fn handle_change_event(
changed_packages_tx: &watch::Sender<ChangedPackages>,
changed_packages: &Mutex<RefCell<ChangedPackages>>,
event: proto::package_change_event::Event,
) -> Result<(), Error> {
// Should we recover here?
Expand All @@ -187,17 +190,17 @@ impl WatchClient {
}) => {
let package_name = PackageName::from(package_name);

changed_packages_tx.send_if_modified(|changed_pkgs| match changed_pkgs {
ChangedPackages::All => false,
match changed_packages.lock().await.get_mut() {
ChangedPackages::All => {
// If we've already changed all packages, ignore
}
ChangedPackages::Some(ref mut pkgs) => {
pkgs.insert(package_name);

true
}
});
}
}
proto::package_change_event::Event::RediscoverPackages(_) => {
changed_packages_tx.send(ChangedPackages::All)?;
*changed_packages.lock().await.get_mut() = ChangedPackages::All;
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
return Err(DaemonError::Unavailable(message).into());
Expand Down

0 comments on commit b70ba36

Please sign in to comment.