From d34154b6c6835f33cfad45a3eebb4ad7500eb6d0 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 2 May 2024 21:14:51 -0400 Subject: [PATCH] Remove will_wake calls We used `will_wake` in correctness checks to make sure we don't hang the future/stream forever if a wrong waker is used, but the contract for this function is best-effort and it started generating more and more false positives. The recent one, found by @aleiserson, is inside rust-lang (https://github.com/rust-lang/rust/pull/119863/), so it is very hard to get around. We don't have a good replacement for this check unless we implement our own waker, but it is probably too much for now --- .../src/helpers/buffers/ordering_sender.rs | 1 - .../src/helpers/buffers/unordered_receiver.rs | 19 ++++------ .../helpers/transport/stream/collection.rs | 37 ++++++++----------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/ipa-core/src/helpers/buffers/ordering_sender.rs b/ipa-core/src/helpers/buffers/ordering_sender.rs index e8a21bda6..8aba0610d 100644 --- a/ipa-core/src/helpers/buffers/ordering_sender.rs +++ b/ipa-core/src/helpers/buffers/ordering_sender.rs @@ -182,7 +182,6 @@ impl WaitingShard { match self.wakers[j].i.cmp(&i) { Ordering::Greater => (), Ordering::Equal => { - assert!(item.w.will_wake(&self.wakers[j].w)); self.wakers[j] = item; return Ok(()); } diff --git a/ipa-core/src/helpers/buffers/unordered_receiver.rs b/ipa-core/src/helpers/buffers/unordered_receiver.rs index 573024626..ef5e2a09f 100644 --- a/ipa-core/src/helpers/buffers/unordered_receiver.rs +++ b/ipa-core/src/helpers/buffers/unordered_receiver.rs @@ -59,7 +59,7 @@ where if recv.is_next(this.i) { recv.poll_next(cx) } else { - recv.add_waker(this.i, cx.waker().clone()); + recv.add_waker(this.i, cx.waker()); Poll::Pending } } @@ -190,7 +190,7 @@ where /// /// [`recv`]: UnorderedReceiver::recv /// [`poll`]: Future::poll - fn add_waker(&mut self, i: usize, waker: Waker) { + fn add_waker(&mut self, i: usize, waker: &Waker) { assert!( i > self.next, "Awaiting a read (record = {i}) that has already been fulfilled. Read cursor is currently at {}", self.next @@ -198,20 +198,17 @@ where // We don't save a waker at `self.next`, so `>` and not `>=`. if i > self.next + self.wakers.len() { #[cfg(feature = "stall-detection")] - let overflow = (waker, i); + let overflow = (waker.clone(), i); #[cfg(not(feature = "stall-detection"))] - let overflow = waker; + let overflow = waker.clone(); self.overflow_wakers.push(overflow); } else { let index = i % self.wakers.len(); - if let Some(old) = self.wakers[index].as_ref() { - // We are OK with having multiple polls of the same `Receiver` - // (or two `Receiver`s for the same item being polled). - // However, as we are only tracking one waker, they both need - // to be woken when we invoke the waker we get. - assert!(waker.will_wake(old)); + if let Some(old) = self.wakers[index].as_mut() { + old.clone_from(waker); + } else { + self.wakers[index] = Some(waker.clone()); } - self.wakers[index] = Some(waker); } } diff --git a/ipa-core/src/helpers/transport/stream/collection.rs b/ipa-core/src/helpers/transport/stream/collection.rs index b5b1a8bcf..d67e58283 100644 --- a/ipa-core/src/helpers/transport/stream/collection.rs +++ b/ipa-core/src/helpers/transport/stream/collection.rs @@ -81,29 +81,24 @@ impl StreamCollection { let mut streams = self.inner.lock().unwrap(); match streams.entry(key.clone()) { - Entry::Occupied(mut entry) => { - match entry.get_mut() { - StreamState::Waiting(old_waker) => { - let will_wake = old_waker.will_wake(waker); - drop(streams); // avoid mutex poisoning - assert!(will_wake); - None - } - rs @ StreamState::Ready(_) => { - let StreamState::Ready(stream) = - std::mem::replace(rs, StreamState::Completed) - else { - unreachable!(); - }; + Entry::Occupied(mut entry) => match entry.get_mut() { + StreamState::Waiting(old_waker) => { + old_waker.clone_from(waker); + None + } + rs @ StreamState::Ready(_) => { + let StreamState::Ready(stream) = std::mem::replace(rs, StreamState::Completed) + else { + unreachable!(); + }; - Some(stream) - } - StreamState::Completed => { - drop(streams); - panic!("{key:?} stream has been consumed already") - } + Some(stream) } - } + StreamState::Completed => { + drop(streams); + panic!("{key:?} stream has been consumed already") + } + }, Entry::Vacant(entry) => { entry.insert(StreamState::Waiting(waker.clone())); None