Skip to content

Commit

Permalink
make ctx.spawn blocking (#3337)
Browse files Browse the repository at this point in the history
* make spawn sync

* improve error type
  • Loading branch information
ordian committed Jun 22, 2021
1 parent 6a44843 commit e58731d
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 49 deletions.
3 changes: 1 addition & 2 deletions metered-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ mod tests {
#[test]
fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5);
let (mut unbounded, _) = unbounded::<Msg>();
let (unbounded, _) = unbounded::<Msg>();

block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err());
assert!(bounded.try_send(Msg::default()).is_err());
assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 });

assert!(unbounded.send(Msg::default()).await.is_err());
assert!(unbounded.unbounded_send(Msg::default()).is_err());
assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 });
});
Expand Down
48 changes: 1 addition & 47 deletions metered-channel/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Metered variant of unbounded mpsc channels to be able to extract metrics.

use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream};
use futures::{channel::mpsc, task::Poll, task::Context, stream::Stream};

use std::result;
use std::pin::Pin;
Expand Down Expand Up @@ -130,21 +130,6 @@ impl<T> UnboundedMeteredSender<T> {
&self.meter
}

/// Send message, wait until capacity is available.
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
where
Self: Unpin,
{
self.meter.note_sent();
let fut = self.inner.send(item);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
}


/// Attempt to send message or fail immediately.
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent();
Expand All @@ -154,34 +139,3 @@ impl<T> UnboundedMeteredSender<T> {
})
}
}

impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> {
type Error = <futures::channel::mpsc::UnboundedSender<T> as futures::sink::Sink<T>>::Error;

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::new(&mut self.inner).start_send(item)
}

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.inner).poll_ready(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
val
}
other => other,
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
val @ Poll::Ready(_)=> {
self.meter.note_sent();
val
}
other => other,
}
}
}

0 comments on commit e58731d

Please sign in to comment.