From 9e2e719979af5cc6ce41d195b0a0b5859f6adeb8 Mon Sep 17 00:00:00 2001 From: daxpedda Date: Sun, 3 Sep 2023 12:53:03 +0200 Subject: [PATCH] Merge `Arc`s --- src/platform_impl/web/async/channel.rs | 45 ++++++++++--------- src/platform_impl/web/async/waker.rs | 61 ++++++++++++-------------- 2 files changed, 49 insertions(+), 57 deletions(-) diff --git a/src/platform_impl/web/async/channel.rs b/src/platform_impl/web/async/channel.rs index 0b6160ea4d9..d91802a77fd 100644 --- a/src/platform_impl/web/async/channel.rs +++ b/src/platform_impl/web/async/channel.rs @@ -8,19 +8,16 @@ use std::task::Poll; pub fn channel() -> (AsyncSender, AsyncReceiver) { let (sender, receiver) = mpsc::channel(); let sender = Arc::new(Mutex::new(sender)); - let waker = Arc::new(AtomicWaker::new()); - let closed = Arc::new(AtomicBool::new(false)); + let inner = Arc::new(Inner { + closed: AtomicBool::new(false), + waker: AtomicWaker::new(), + }); let sender = AsyncSender { sender, - closed: closed.clone(), - waker: Arc::clone(&waker), - }; - let receiver = AsyncReceiver { - receiver, - closed, - waker, + inner: Arc::clone(&inner), }; + let receiver = AsyncReceiver { receiver, inner }; (sender, receiver) } @@ -31,14 +28,13 @@ pub struct AsyncSender { // to wrap it in an `Arc` to make it clonable on the main thread without // having to block. sender: Arc>>, - closed: Arc, - waker: Arc, + inner: Arc, } impl AsyncSender { pub fn send(&self, event: T) -> Result<(), SendError> { self.sender.lock().unwrap().send(event)?; - self.waker.wake(); + self.inner.waker.wake(); Ok(()) } @@ -47,9 +43,8 @@ impl AsyncSender { impl Clone for AsyncSender { fn clone(&self) -> Self { Self { - sender: self.sender.clone(), - waker: self.waker.clone(), - closed: self.closed.clone(), + sender: Arc::clone(&self.sender), + inner: Arc::clone(&self.inner), } } } @@ -58,17 +53,16 @@ impl Drop for AsyncSender { fn drop(&mut self) { // If it's the last + the one held by the receiver make sure to wake it // up and tell it that all receiver have dropped. - if Arc::strong_count(&self.closed) == 2 { - self.closed.store(true, Ordering::Relaxed); - self.waker.wake() + if Arc::strong_count(&self.inner) == 2 { + self.inner.closed.store(true, Ordering::Relaxed); + self.inner.waker.wake() } } } pub struct AsyncReceiver { receiver: Receiver, - closed: Arc, - waker: Arc, + inner: Arc, } impl AsyncReceiver { @@ -76,16 +70,16 @@ impl AsyncReceiver { future::poll_fn(|cx| match self.receiver.try_recv() { Ok(event) => Poll::Ready(Ok(event)), Err(TryRecvError::Empty) => { - if self.closed.load(Ordering::Relaxed) { + if self.inner.closed.load(Ordering::Relaxed) { return Poll::Ready(Err(RecvError)); } - self.waker.register(cx.waker()); + self.inner.waker.register(cx.waker()); match self.receiver.try_recv() { Ok(event) => Poll::Ready(Ok(event)), Err(TryRecvError::Empty) => { - if self.closed.load(Ordering::Relaxed) { + if self.inner.closed.load(Ordering::Relaxed) { Poll::Ready(Err(RecvError)) } else { Poll::Pending @@ -99,3 +93,8 @@ impl AsyncReceiver { .await } } + +struct Inner { + closed: AtomicBool, + waker: AtomicWaker, +} diff --git a/src/platform_impl/web/async/waker.rs b/src/platform_impl/web/async/waker.rs index 12ca6f838b0..83ec7971371 100644 --- a/src/platform_impl/web/async/waker.rs +++ b/src/platform_impl/web/async/waker.rs @@ -7,8 +7,7 @@ use std::task::Poll; pub struct Waker { wrapper: Wrapper, Sender, usize>, - counter: Arc, - waker: Arc, + inner: Arc, } struct Handler { @@ -17,17 +16,13 @@ struct Handler { } #[derive(Clone)] -struct Sender { - counter: Arc, - waker: Arc, - closed: Arc, -} +struct Sender(Arc); impl Drop for Sender { fn drop(&mut self) { - if Arc::strong_count(&self.closed) == 1 { - self.closed.store(true, Ordering::Relaxed); - self.waker.wake(); + if Arc::strong_count(&self.0) == 1 { + self.0.closed.store(true, Ordering::Relaxed); + self.0.waker.wake(); } } } @@ -35,17 +30,15 @@ impl Drop for Sender { impl Waker { #[track_caller] pub fn new(value: T, handler: fn(&T, usize)) -> Option { - let counter = Arc::new(AtomicUsize::new(0)); - let waker = Arc::new(AtomicWaker::new()); - let closed = Arc::new(AtomicBool::new(false)); + let inner = Arc::new(Inner { + counter: AtomicUsize::new(0), + waker: AtomicWaker::new(), + closed: AtomicBool::new(false), + }); let handler = Handler { value, handler }; - let sender = Sender { - counter: Arc::clone(&counter), - waker: Arc::clone(&waker), - closed: Arc::clone(&closed), - }; + let sender = Sender(Arc::clone(&inner)); let wrapper = Wrapper::new( handler, @@ -55,28 +48,27 @@ impl Waker { (handler.handler)(&handler.value, count); }, { - let counter = Arc::clone(&counter); - let waker = Arc::clone(&waker); + let inner = Arc::clone(&inner); move |handler| async move { while let Some(count) = future::poll_fn(|cx| { - let count = counter.swap(0, Ordering::Relaxed); + let count = inner.counter.swap(0, Ordering::Relaxed); if count > 0 { Poll::Ready(Some(count)) } else { - if closed.load(Ordering::Relaxed) { + if inner.closed.load(Ordering::Relaxed) { return Poll::Ready(None); } - waker.register(cx.waker()); + inner.waker.register(cx.waker()); - let count = counter.swap(0, Ordering::Relaxed); + let count = inner.counter.swap(0, Ordering::Relaxed); if count > 0 { Poll::Ready(Some(count)) } else { - if closed.load(Ordering::Relaxed) { + if inner.closed.load(Ordering::Relaxed) { return Poll::Ready(None); } @@ -94,16 +86,12 @@ impl Waker { }, sender, |inner, _| { - inner.counter.fetch_add(1, Ordering::Relaxed); - inner.waker.wake(); + inner.0.counter.fetch_add(1, Ordering::Relaxed); + inner.0.waker.wake(); }, )?; - Some(Self { - wrapper, - counter, - waker, - }) + Some(Self { wrapper, inner }) } pub fn wake(&self) { @@ -115,8 +103,13 @@ impl Clone for Waker { fn clone(&self) -> Self { Self { wrapper: self.wrapper.clone(), - counter: Arc::clone(&self.counter), - waker: Arc::clone(&self.waker), + inner: Arc::clone(&self.inner), } } } + +struct Inner { + counter: AtomicUsize, + waker: AtomicWaker, + closed: AtomicBool, +}