Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: WASM client via web-sys transport #648

Merged
merged 41 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
261adb9
feat: untested web-sys transport
niklasad1 Jan 7, 2022
6e8324d
rewrite me
niklasad1 Jan 10, 2022
6831194
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 2, 2022
d671947
make it work
niklasad1 Feb 8, 2022
908a156
add hacks and works :)
niklasad1 Feb 8, 2022
b6e1e04
add subscription test too
niklasad1 Feb 8, 2022
0497648
revert StdError change; still works
niklasad1 Feb 9, 2022
267d260
cleanup
niklasad1 Feb 9, 2022
e03566b
remove hacks
niklasad1 Feb 9, 2022
8307117
more wasm tests outside workspace
niklasad1 Feb 9, 2022
d2769e0
kill mutually exclusive features
niklasad1 Feb 9, 2022
6346a8b
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 9, 2022
f7351f4
merge nits
niklasad1 Feb 9, 2022
7d8a688
remove unsafe hack
niklasad1 Feb 9, 2022
1414280
fix nit
niklasad1 Feb 9, 2022
8e6028e
core: fix features and deps
niklasad1 Feb 10, 2022
e56c099
ci: add WASM test
niklasad1 Feb 10, 2022
12b383b
test again
niklasad1 Feb 10, 2022
337d76d
work work
niklasad1 Feb 10, 2022
520edfc
comeon
niklasad1 Feb 10, 2022
f006173
work work
niklasad1 Feb 10, 2022
9baac7f
revert unintentional change
niklasad1 Feb 10, 2022
d98659b
Update core/Cargo.toml
niklasad1 Feb 15, 2022
56fccd2
Update core/src/client/async_client/mod.rs
niklasad1 Feb 15, 2022
ff3e0b6
revert needless change: std hashmap + fxhashmap works
niklasad1 Feb 15, 2022
40a3bdd
cleanup
niklasad1 Feb 15, 2022
f2389ab
extract try_connect_until fn
niklasad1 Feb 15, 2022
3ac6dbb
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Feb 22, 2022
af3884e
remove todo
niklasad1 Feb 22, 2022
4ec7477
fix bad merge
niklasad1 Feb 22, 2022
34e1dbb
add wasm client wrapper crate
niklasad1 Mar 4, 2022
0b8ff78
fix nits
niklasad1 Mar 4, 2022
6b32049
use gloo-net dependency
niklasad1 Mar 21, 2022
b558166
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Mar 21, 2022
1fc3705
fix build
niklasad1 Mar 22, 2022
0972961
Merge remote-tracking branch 'origin/master' into wasm-client
niklasad1 Apr 20, 2022
c683541
grumbles CI: rename to `wasm_tests`
niklasad1 Apr 20, 2022
ee4bb2f
fix bad merge
niklasad1 Apr 20, 2022
cad8c75
fix grumbles
niklasad1 Apr 20, 2022
7d6e9e5
fix nit
niklasad1 Apr 20, 2022
b2de6d0
comeon CI
niklasad1 Apr 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ members = [
"client/http-client",
"client/transport",
"proc-macros",
"wasm-tests",
]
resolver = "2"
15 changes: 11 additions & 4 deletions client/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ documentation = "https://docs.rs/jsonrpsee-ws-client"
[dependencies]
jsonrpsee-types = { path = "../../types", version = "0.9.0", optional = true }
jsonrpsee-core = { path = "../../core", version = "0.9.0", features = ["client"] }
tracing = { version = "0.1", optional = true }
tracing = "0.1"
thiserror = { version = "1", optional = true }
futures = { version = "0.3.14", default-features = false, features = ["std"], optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
http = { version = "0.2", optional = true }
tokio-util = { version = "0.6", features = ["compat"], optional = true }
tokio = { version = "1", features = ["net", "time", "macros"], optional = true }
Expand All @@ -26,16 +27,22 @@ tokio-rustls = { version = "0.23", optional = true }
# ws
soketto = { version = "0.7.1", optional = true }

# wasm
wasm-bindgen = { version = "0.2.69", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
js-sys = { version = "0.3.46", optional = true }
web-sys = { version = "0.3.46", features = ["BinaryType", "Blob", "ErrorEvent", "FileReader", "MessageEvent", "CloseEvent", "ProgressEvent", "WebSocket", "console"], optional = true }

[features]
tls = ["tokio-rustls", "webpki-roots", "rustls-native-certs"]
ws = [
"futures",
"futures-util",
"http",
"tokio",
"tokio-util",
"soketto",
"pin-project",
"jsonrpsee-types",
"thiserror",
"tracing"
]
wasm = ["wasm-bindgen", "wasm-bindgen-futures", "js-sys", "web-sys", "futures-channel", "futures-util"]
4 changes: 4 additions & 0 deletions client/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@
/// Websocket transport
#[cfg(feature = "ws")]
pub mod ws;

/// Websocket transport via web-sys.
#[cfg(feature = "wasm")]
pub mod web_sys;
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
98 changes: 98 additions & 0 deletions client/transport/src/web_sys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use futures_channel::mpsc;
use futures_util::StreamExt;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
use jsonrpsee_core::{async_trait, Error};
use wasm_bindgen::closure::Closure;
use wasm_bindgen::{JsCast, JsValue};
use web_sys::{CloseEvent, MessageEvent, WebSocket};

/// Sender.
#[derive(Debug)]
pub struct Sender(WebSocket);

// TODO: safety.
unsafe impl Send for Sender {}
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

/// Receiver.
#[derive(Debug)]
pub struct Receiver(mpsc::UnboundedReceiver<String>);

#[async_trait]
impl TransportSenderT for Sender {
type Error = Error;

async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
tracing::trace!("tx: {:?}", msg);
self.0.send_with_str(&msg).map_err(|e| Error::Custom(e.as_string().unwrap()))
}

async fn close(&mut self) -> Result<(), Error> {
self.0.close().map_err(|e| Error::Custom(e.as_string().unwrap()))
}
}

#[async_trait]
impl TransportReceiverT for Receiver {
type Error = Error;

async fn receive(&mut self) -> Result<String, Self::Error> {
match self.0.next().await {
Some(msg) => {
tracing::trace!("rx: {:?}", msg);
Ok(msg)
}
None => Err(Error::Custom("channel closed".into())),
}
}
}

/// Create a transport sender & receiver pair.
pub async fn build_transport(url: impl AsRef<str>) -> Result<(Sender, Receiver), ()> {
let (tx, rx) = mpsc::unbounded();

let websocket = WebSocket::new(url.as_ref()).map_err(|_| ())?;
websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);

let tx1 = tx.clone();

let on_msg_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
// Binary message.
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let msg = abuf.to_string();
let _ = tx.unbounded_send(msg.into());
// Text message.
} else if let Some(txt) = e.data().as_string() {
let _ = tx.unbounded_send(txt);
} else if let Ok(_blob) = e.data().dyn_into::<web_sys::Blob>() {
tracing::warn!("Received blob message; not supported");
} else {
tracing::warn!("Received unsupported message");
}
}) as Box<dyn FnMut(MessageEvent)>);

// Close event.
let on_close_callback = Closure::wrap(Box::new(move |_e: CloseEvent| {
tracing::info!("Connection closed");
tx1.close_channel();
}) as Box<dyn FnMut(web_sys::CloseEvent)>);

let (conn_tx, mut conn_rx) = mpsc::unbounded();

let on_open_callback = Closure::wrap(Box::new(move |_| {
tracing::info!("Connection established");
conn_tx.unbounded_send(()).expect("rx still alive; qed");
}) as Box<dyn FnMut(JsValue)>);

websocket.set_onopen(Some(on_open_callback.as_ref().unchecked_ref()));
websocket.set_onmessage(Some(on_msg_callback.as_ref().unchecked_ref()));
websocket.set_onclose(Some(on_close_callback.as_ref().unchecked_ref()));

// Prevent for being dropped (this will be leaked intentionally).
on_msg_callback.forget();
on_open_callback.forget();
on_close_callback.forget();

conn_rx.next().await;
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

Ok((Sender(websocket), Receiver(rx)))
}
6 changes: 3 additions & 3 deletions client/transport/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;

use futures::io::{BufReader, BufWriter};
use futures_util::io::{BufReader, BufWriter};
use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use jsonrpsee_core::{async_trait, Cow};
Expand Down Expand Up @@ -188,7 +188,7 @@ impl TransportSenderT for Sender {

/// Sends out a request. Returns a `Future` that finishes when the request has been
/// successfully sent.
async fn send(&mut self, body: String) -> Result<(), WsError> {
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
tracing::debug!("send: {}", body);
self.inner.send_text(body).await?;
self.inner.flush().await?;
Expand All @@ -206,7 +206,7 @@ impl TransportReceiverT for Receiver {
type Error = WsError;

/// Returns a `Future` resolving when the server sent us something back.
async fn receive(&mut self) -> Result<String, WsError> {
async fn receive(&mut self) -> Result<String, Self::Error> {
let mut message = Vec::new();
self.inner.receive_data(&mut message).await?;
let s = String::from_utf8(message).expect("Found invalid UTF-8");
Expand Down
28 changes: 14 additions & 14 deletions client/transport/src/ws/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use futures::io::{IoSlice, IoSliceMut};
use futures::prelude::*;
use futures_util::io::{IoSlice, IoSliceMut};
use futures_util::*;
use pin_project::pin_project;
use tokio::net::TcpStream;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
Expand All @@ -53,13 +53,13 @@ impl AsyncRead for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read(compat, cx, buf)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read(compat, cx, buf)
}
}
Expand All @@ -73,13 +73,13 @@ impl AsyncRead for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read_vectored(compat, cx, bufs)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncRead::poll_read_vectored(compat, cx, bufs)
}
}
Expand All @@ -91,13 +91,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write(compat, cx, buf)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write(compat, cx, buf)
}
}
Expand All @@ -107,13 +107,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write_vectored(compat, cx, bufs)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_write_vectored(compat, cx, bufs)
}
}
Expand All @@ -123,13 +123,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_flush(compat, cx)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_flush(compat, cx)
}
}
Expand All @@ -139,13 +139,13 @@ impl AsyncWrite for EitherStream {
match self.project() {
EitherStreamProj::Plain(s) => {
let compat = s.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_close(compat, cx)
}
#[cfg(feature = "tls")]
EitherStreamProj::Tls(t) => {
let compat = t.compat_write();
futures::pin_mut!(compat);
futures_util::pin_mut!(compat);
AsyncWrite::poll_close(compat, cx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,6 @@ impl<'a> WsClientBuilder<'a> {
.request_timeout(self.request_timeout)
.max_concurrent_requests(self.max_concurrent_requests)
.id_format(self.id_kind)
.build(sender, receiver))
.build_with_tokio(sender, receiver))
}
}
47 changes: 32 additions & 15 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,62 @@ edition = "2021"
license = "MIT"

[dependencies]
# deps
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
anyhow = "1"
arrayvec = "0.7.1"
async-trait = "0.1"
async-channel = { version = "1.6", optional = true }
beef = { version = "0.5.1", features = ["impl_serde"] }
thiserror = "1"
futures-channel = { version = "0.3.14", default-features = false }
jsonrpsee-types = { path = "../types", version = "0.9.0" }
thiserror = "1"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }

# optional deps
async-channel = { version = "1.6", optional = true }
async-lock = { version = "2.4", optional = true }
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"] }
jsonrpsee-types = { path = "../types", version = "0.9.0"}
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
tracing = { version = "0.1", optional = true }
rustc-hash = { version = "1", optional = true }
rustc-hash = { version = "1", default-features = false, optional = true }
hashbrown = { version = "0.12", optional = true }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
soketto = "0.7.1"
soketto = { version = "0.7.1", optional = true }
parking_lot = { version = "0.12", optional = true }
tokio = { version = "1.8", features = ["rt"], optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }

[features]
default = []
http-helpers = ["futures-util"]
http-helpers = ["hyper", "futures-util"]
server = [
"async-channel",
"futures-util",
"rustc-hash",
"rustc-hash/std",
"tracing",
"parking_lot",
"rand",
"tokio",
"soketto",
]
client = ["futures-util"]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
async-client = [
"async-lock",
"client",
"rustc-hash",
"tokio/sync",
"tokio/macros",
"tokio/time",
"tracing"
"hashbrown",
"tracing",
"futures-timer",
]
async-wasm-client = [
Copy link
Member Author

@niklasad1 niklasad1 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could reduce the features if the async client was extracted to a separate crate...

then we could do some target specific assertions but won't work here.

"async-lock",
"client",
"wasm-bindgen-futures",
"hashbrown",
"rustc-hash",
"futures-timer/wasm-bindgen",
"tracing",
]

[dev-dependencies]
Expand Down
Loading