Skip to content

Commit

Permalink
Remove duplicated codes
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi authored and cramertj committed Nov 13, 2019
1 parent 5b63dfd commit e0599be
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions futures-util/src/stream/stream/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,20 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}

impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
if this.slot.is_some() {
ready!(inner.as_pin_mut().poll_ready(cx))?;
Poll::Ready(inner.as_pin_mut().start_send(this.slot.take().unwrap()))
fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
if slot.is_some() {
ready!(inner.as_mut().poll_ready(cx))?;
Poll::Ready(inner.start_send(slot.take().unwrap()))
} else {
Poll::Ready(Ok(()))
}
}

fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
}
}

impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
Expand All @@ -82,7 +86,7 @@ impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
if self.slot.is_none() {
return Poll::Ready(Ok(()));
}
ready!(self.as_mut().poll_flush_slot(cx))?;
ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
}
}

Expand All @@ -94,20 +98,14 @@ impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
if this.slot.is_some() {
ready!(inner.as_pin_mut().poll_ready(cx))?;
inner.as_pin_mut().start_send(this.slot.take().unwrap())?;
}
ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
inner.as_pin_mut().poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
if this.slot.is_some() {
ready!(inner.as_pin_mut().poll_ready(cx))?;
inner.as_pin_mut().start_send(this.slot.take().unwrap())?
}
ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
inner.as_pin_mut().poll_close(cx)
}
}
Expand Down

0 comments on commit e0599be

Please sign in to comment.