Skip to content

Commit

Permalink
feat: add StreamExt::take_until
Browse files Browse the repository at this point in the history
This commit migrates `Stream::take_until` from futures into futures-lite with little modification.

Co-authored-by: zhulin.zzz <zhul289@chinaunicom.cn>
Co-authored-by: John Nunley <jtnunley01@gmail.com>
  • Loading branch information
3 people authored May 28, 2024
1 parent da2f2a2 commit 091964c
Showing 1 changed file with 170 additions and 0 deletions.
170 changes: 170 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,176 @@ impl<F: Future> Stream for OnceFuture<F> {
}
}

/// Take elements from this stream until the provided future resolves.
///
/// This function will take elements from the stream until the provided
/// stopping future `fut` resolves. Once the `fut` future becomes ready,
/// this stream combinator will always return that the stream is done.
///
/// The stopping future may return any type. Once the stream is stopped
/// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
/// The stream may also be resumed with `TakeUntil::take_future()`.
/// See the documentation of [`TakeUntil`] for more information.
///
/// ```
/// use futures_lite::stream::{self, StreamExt, take_until};
/// use futures_lite::future;
/// use std::task::Poll;
///
/// let stream = stream::iter(1..=10);
///
/// # spin_on::spin_on(async {
/// let mut i = 0;
/// let stop_fut = future::poll_fn(|_cx| {
/// i += 1;
/// if i <= 5 {
/// Poll::Pending
/// } else {
/// Poll::Ready(())
/// }
/// });
///
/// let stream = take_until(stream, stop_fut);
///
/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
/// # });
pub fn take_until<S, F>(stream: S, future: F) -> TakeUntil<S, F>
where
S: Sized + Stream,
F: Future,
{
TakeUntil {
stream,
fut: Some(future),
fut_result: None,
free: false,
}
}

pin_project! {
/// Stream for the [`StreamExt::take_until()`] method.
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TakeUntil<S: Stream, Fut: Future> {
#[pin]
stream: S,
// Contains the inner Future on start and None once the inner Future is resolved
// or taken out by the user.
#[pin]
fut: Option<Fut>,
// Contains fut's return value once fut is resolved
fut_result: Option<Fut::Output>,
// Whether the future was taken out by the user.
free: bool,
}
}

impl<St, Fut> TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
/// Extract the stopping future out of the combinator.
///
/// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
/// Taking out the future means the combinator will be yielding
/// elements from the wrapped stream without ever stopping it.
pub fn take_future(&mut self) -> Option<Fut> {
if self.fut.is_some() {
self.free = true;
}

self.fut.take()
}

/// Once the stopping future is resolved, this method can be used
/// to extract the value returned by the stopping future.
///
/// This may be used to retrieve arbitrary data from the stopping
/// future, for example a reason why the stream was stopped.
///
/// This method will return `None` if the future isn't resolved yet,
/// or if the result was already taken out.
///
/// # Examples
///
/// ```
/// # spin_on::spin_on(async {
/// use futures_lite::stream::{self, StreamExt, take_until};
/// use futures_lite::future;
/// use std::task::Poll;
///
/// let stream = stream::iter(1..=10);
///
/// let mut i = 0;
/// let stop_fut = future::poll_fn(|_cx| {
/// i += 1;
/// if i <= 5 {
/// Poll::Pending
/// } else {
/// Poll::Ready("reason")
/// }
/// });
///
/// let mut stream = take_until(stream, stop_fut);
/// let _ = (&mut stream).collect::<Vec<_>>().await;
///
/// let result = stream.take_result().unwrap();
/// assert_eq!(result, "reason");
/// # });
/// ```
pub fn take_result(&mut self) -> Option<Fut::Output> {
self.fut_result.take()
}

/// Whether the stream was stopped yet by the stopping future
/// being resolved.
pub fn is_stopped(&self) -> bool {
!self.free && self.fut.is_none()
}
}

impl<St, Fut> Stream for TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
type Item = St::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
let mut this = self.project();

if let Some(f) = this.fut.as_mut().as_pin_mut() {
if let Poll::Ready(result) = f.poll(cx) {
this.fut.set(None);
*this.fut_result = Some(result);
}
}

if !*this.free && this.fut.is_none() {
// Future resolved, inner stream stopped
Poll::Ready(None)
} else {
// Future either not resolved yet or taken out by the user
let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
this.fut.set(None);
}
Poll::Ready(item)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.is_stopped() {
return (0, Some(0));
}

// Original stream can be truncated at any moment, so the lower bound isn't reliable.
let (_, upper_bound) = self.stream.size_hint();
(0, upper_bound)
}
}

/// Extension trait for [`Stream`].
pub trait StreamExt: Stream {
/// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
Expand Down

0 comments on commit 091964c

Please sign in to comment.