Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert async iterators to streams #2401

Merged
merged 2 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ edition = "2018"
cfg-if = "1.0.0"
js-sys = { path = "../js-sys", version = '0.3.46' }
wasm-bindgen = { path = "../..", version = '0.2.69' }
futures-core = { version = '0.3.8', default-features = false, optional = true }

[features]
futures-core-03-stream = ['futures-core']

[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
path = "../web-sys"
Expand All @@ -26,3 +30,4 @@ features = [
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { path = '../test', version = '0.3.19' }
futures-channel-preview = { version = "0.3.0-alpha.18" }
futures-lite = { version = "1.11.3", default-features = false }
3 changes: 3 additions & 0 deletions crates/futures/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This crate bridges the gap between a Rust `Future` and a JavaScript
1. From a JavaScript `Promise` into a Rust `Future`.
2. From a Rust `Future` into a JavaScript `Promise`.

Additionally under the feature flag `futures-core-03-stream` there is experimental
support for `AsyncIterator` to `Stream` conversion.

See the [API documentation][docs] for more info.

[docs]: https://rustwasm.github.io/wasm-bindgen/api/wasm_bindgen_futures/
2 changes: 2 additions & 0 deletions crates/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use std::task::{Context, Poll, Waker};
use wasm_bindgen::prelude::*;

mod queue;
#[cfg(feature = "futures-core-03-stream")]
pub mod stream;

mod task {
use cfg_if::cfg_if;
Expand Down
81 changes: 81 additions & 0 deletions crates/futures/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
//!
//! Analogous to the promise to future convertion, this module allows the
//! turing objects implementing the async iterator protocol into `Stream`s
//! that produce values that can be awaited from.
//!

use crate::JsFuture;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::stream::Stream;
use js_sys::{AsyncIterator, IteratorNext};
use wasm_bindgen::{prelude::*, JsCast};

/// A `Stream` that yields values from an underlying `AsyncIterator`.
pub struct JsStream {
iter: AsyncIterator,
next: Option<JsFuture>,
done: bool,
}

impl JsStream {
fn next_future(&self) -> Result<JsFuture, JsValue> {
self.iter.next().map(JsFuture::from)
}
}

impl From<AsyncIterator> for JsStream {
fn from(iter: AsyncIterator) -> Self {
JsStream {
iter,
next: None,
done: false,
}
}
}

impl Stream for JsStream {
type Item = Result<JsValue, JsValue>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

let future = match self.next.as_mut() {
Some(val) => val,
None => match self.next_future() {
Ok(val) => {
self.next = Some(val);
self.next.as_mut().unwrap()
}
Err(e) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
},
};

match Pin::new(future).poll(cx) {
Poll::Ready(res) => match res {
Ok(iter_next) => {
let next = iter_next.unchecked_into::<IteratorNext>();
if next.done() {
self.done = true;
Poll::Ready(None)
} else {
self.next.take();
Poll::Ready(Some(Ok(next.value())))
}
}
Err(e) => {
self.done = true;
Poll::Ready(Some(Err(e)))
}
},
Poll::Pending => Poll::Pending,
}
}
}
23 changes: 23 additions & 0 deletions crates/futures/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,26 @@ async fn can_create_multiple_futures_from_same_promise() {
a.await.unwrap();
b.await.unwrap();
}

#[cfg(feature = "futures-core-03-stream")]
#[wasm_bindgen_test]
async fn can_use_an_async_iterable_as_stream() {
use futures_lite::stream::StreamExt;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::stream::JsStream;

let async_iter = js_sys::Function::new_no_args(
"return async function*() {
yield 42;
yield 24;
}()",
)
.call0(&JsValue::undefined())
.unwrap()
.unchecked_into::<js_sys::AsyncIterator>();

let mut stream = JsStream::from(async_iter);
assert_eq!(stream.next().await, Some(Ok(JsValue::from(42))));
assert_eq!(stream.next().await, Some(Ok(JsValue::from(24))));
assert_eq!(stream.next().await, None);
}