From 8229aacf716022a1cc34a85e0909bba5ffe4b3b4 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 21 Sep 2023 13:50:19 +0200 Subject: [PATCH 1/2] Fix typo --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7287dd9faaf..fa1ad05139c 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -699,7 +699,7 @@ impl Receiver { changed_impl(&self.shared, &mut self.version).await } - /// Waits for a value that satisifes the provided condition. + /// Waits for a value that satisfies the provided condition. /// /// This method will call the provided closure whenever something is sent on /// the channel. Once the closure returns `true`, this method will return a From ee1369116bda6efc82b21771ea54bf2a9a8a36cb Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 21 Sep 2023 21:19:50 +0200 Subject: [PATCH 2/2] watch::Receiver::wait_for(): Prevent poisoning of the lock --- tokio/src/sync/watch.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index fa1ad05139c..c5c82f90fd8 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -772,8 +772,23 @@ impl Receiver { let has_changed = self.version != new_version; self.version = new_version; - if (!closed || has_changed) && f(&inner) { - return Ok(Ref { inner, has_changed }); + if !closed || has_changed { + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner))); + match result { + Ok(true) => { + return Ok(Ref { inner, has_changed }); + } + Ok(false) => { + // Skip the value. + } + Err(panicked) => { + // Drop the read-lock to avoid poisoning it. + drop(inner); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + // Unreachable + } + }; } }