Skip to content

Commit

Permalink
Quick and dirty patch to compile client-only transport on wasm32.
Browse files Browse the repository at this point in the history
  • Loading branch information
boxdot committed Feb 3, 2021
1 parent 729308b commit 38817a9
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 46 deletions.
21 changes: 16 additions & 5 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ default = ["transport", "codegen", "prost"]
codegen = ["async-trait"]
transport = [
"h2",
"hyper",
"tokio",
"hyper/full",
"tokio/net",
"tower",
"tower/balance",
"tracing-futures",
"tokio/macros"
]
client = [
"h2",
"hyper/client",
"hyper/http2",
"tokio",
"tower",
]
tls = ["transport", "tokio-rustls"]
tls-roots = ["tls", "rustls-native-certs"]
prost = ["prost1", "prost-derive"]
Expand Down Expand Up @@ -65,16 +73,19 @@ async-trait = { version = "0.1.13", optional = true }

# transport
h2 = { version = "0.3", optional = true }
hyper = { version = "0.14.2", features = ["full"], optional = true }
tokio = { version = "1.0.1", features = ["net"], optional = true }
hyper = { version = "0.14.2", default-features = false, optional = true }
tokio = { version = "1.0.1", default-features = false, optional = true }
tokio-stream = "0.1"
tower = { version = "0.4", features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true}
tower = { version = "0.4", features = ["buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true}
tracing-futures = { version = "0.2", optional = true }

# rustls
tokio-rustls = { version = "0.22", optional = true }
rustls-native-certs = { version = "0.5", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4.18"

[dev-dependencies]
tokio = { version = "1.0", features = ["rt", "macros"] }
static_assertions = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub mod codec;
pub mod metadata;
pub mod server;

#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
pub mod transport;

Expand Down
6 changes: 3 additions & 3 deletions tonic/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::metadata::MetadataMap;
#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
use crate::transport::Certificate;
use futures_core::Stream;
use http::Extensions;
use std::net::SocketAddr;
#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
use std::sync::Arc;

/// A gRPC request and metadata from an RPC call.
Expand All @@ -18,7 +18,7 @@ pub struct Request<T> {
#[derive(Clone)]
pub(crate) struct ConnectionInfo {
pub(crate) remote_addr: Option<SocketAddr>,
#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
pub(crate) peer_certs: Option<Arc<Vec<Certificate>>>,
}

Expand Down
3 changes: 3 additions & 0 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl Endpoint {
// FIXME: determine if we want to expose this or not. This is really
// just used in codegen for a shortcut.
#[doc(hidden)]
#[cfg(feature = "transport")]
pub fn new<D>(dst: D) -> Result<Self, Error>
where
D: TryInto<Self>,
Expand Down Expand Up @@ -231,6 +232,7 @@ impl Endpoint {
}

/// Create a channel from this config.
#[cfg(feature = "transport")]
pub async fn connect(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
Expand All @@ -250,6 +252,7 @@ impl Endpoint {
///
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
#[cfg(feature = "transport")]
pub fn connect_lazy(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
http.enforce_http(false);
Expand Down
23 changes: 16 additions & 7 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,35 @@ pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;

use super::service::{Connection, DynamicServiceStream};
use super::service::Connection;
#[cfg(feature = "transport")]
use super::service::DynamicServiceStream;
use crate::body::BoxBody;
use bytes::Bytes;
use http::{
uri::{InvalidUri, Uri},
Request, Response,
};
use hyper::client::connect::Connection as HyperConnection;
#[cfg(feature = "transport")]
use std::hash::Hash;
use std::{
fmt,
future::Future,
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{channel, Sender},
};

use tokio::io::{AsyncRead, AsyncWrite};
#[cfg(feature = "transport")]
use tokio::sync::mpsc::{channel, Sender};

#[cfg(feature = "transport")]
use tower::balance::p2c::Balance;
#[cfg(feature = "transport")]
use tower::discover::{Change, Discover};
use tower::{
buffer::{self, Buffer},
discover::{Change, Discover},
util::{BoxService, Either},
Service,
};
Expand Down Expand Up @@ -108,6 +113,7 @@ impl Channel {
///
/// This creates a [`Channel`] that will load balance accross all the
/// provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
Expand All @@ -121,6 +127,7 @@ impl Channel {
/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
where
K: Hash + Eq + Send + Clone + 'static,
Expand All @@ -130,6 +137,7 @@ impl Channel {
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

#[cfg(feature = "transport")]
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
where
C: Service<Uri> + Send + 'static,
Expand Down Expand Up @@ -162,6 +170,7 @@ impl Channel {
Ok(Channel { svc })
}

#[cfg(feature = "transport")]
pub(crate) fn balance<D>(discover: D, buffer_size: usize) -> Self
where
D: Discover<Service = Connection> + Unpin + Send + 'static,
Expand Down
20 changes: 20 additions & 0 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,34 @@
//! [rustls]: https://docs.rs/rustls/0.16.0/rustls/

pub mod channel;
#[cfg(feature = "transport")]
pub mod server;

/// Trait that connected IO resources implement.
///
/// The goal for this trait is to allow users to implement
/// custom IO types that can still provide the same connection
/// metadata.
pub trait Connected {
/// Return the remote address this IO resource is connected too.
fn remote_addr(&self) -> Option<std::net::SocketAddr> {
None
}

/// Return the set of connected peer TLS certificates.
fn peer_certs(&self) -> Option<Vec<Certificate>> {
None
}
}

mod error;
mod service;
mod tls;

#[doc(inline)]
pub use self::channel::{Channel, Endpoint};
pub use self::error::Error;
#[cfg(feature = "transport")]
#[doc(inline)]
pub use self::server::{NamedService, Server};
pub use self::tls::{Certificate, Identity};
Expand All @@ -105,6 +124,7 @@ pub use hyper::{Body, Uri};
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::channel::ClientTlsConfig;
#[cfg(feature = "tls")]
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::server::ServerTlsConfig;

Expand Down
19 changes: 2 additions & 17 deletions tonic/src/transport/server/conn.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
#[cfg(feature = "tls")]
use crate::transport::Certificate;
use crate::transport::Connected;
use hyper::server::conn::AddrStream;
use std::net::SocketAddr;
use tokio::net::TcpStream;
#[cfg(feature = "tls")]
use tokio_rustls::{rustls::Session, server::TlsStream};

/// Trait that connected IO resources implement.
///
/// The goal for this trait is to allow users to implement
/// custom IO types that can still provide the same connection
/// metadata.
pub trait Connected {
/// Return the remote address this IO resource is connected too.
fn remote_addr(&self) -> Option<SocketAddr> {
None
}

/// Return the set of connected peer TLS certificates.
fn peer_certs(&self) -> Option<Vec<Certificate>> {
None
}
}

impl Connected for AddrStream {
fn remote_addr(&self) -> Option<SocketAddr> {
Some(self.remote_addr())
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod incoming;
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
mod tls;

pub use conn::Connected;
pub use super::Connected;
#[cfg(feature = "tls")]
pub use tls::ServerTlsConfig;

Expand Down
47 changes: 36 additions & 11 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ pub(crate) struct Connection {
inner: BoxService<Request, Response, crate::Error>,
}

#[cfg(target_arch = "wasm32")]
mod wasm {
use std::future::Future;
use std::pin::Pin;

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

pub(crate) struct Executor;

impl hyper::rt::Executor<BoxSendFuture> for Executor {
fn execute(&self, fut: BoxSendFuture) {
wasm_bindgen_futures::spawn_local(fut)
}
}
}

impl Connection {
fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
where
Expand All @@ -35,21 +51,29 @@ impl Connection {
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
let mut settings = Builder::new()
.http2_initial_stream_window_size(endpoint.init_stream_window_size)
.http2_initial_connection_window_size(endpoint.init_connection_window_size)
.http2_only(true)
.http2_keep_alive_interval(endpoint.http2_keep_alive_interval)
.clone();
let mut settings = Builder::new();

if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.http2_keep_alive_timeout(val);
#[cfg(target_arch = "wasm32")]
{
settings.executor(wasm::Executor);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.http2_keep_alive_while_idle(val);
#[cfg(feature = "transport")]
{
settings.http2_keep_alive_interval(endpoint.http2_keep_alive_interval);
if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.http2_keep_alive_timeout(val);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.http2_keep_alive_while_idle(val);
}
}

settings
.http2_initial_stream_window_size(endpoint.init_stream_window_size)
.http2_initial_connection_window_size(endpoint.init_connection_window_size)
.http2_only(true);

let stack = ServiceBuilder::new()
.layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone()))
.layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
Expand Down Expand Up @@ -78,6 +102,7 @@ impl Connection {
Self::new(connector, endpoint, false).ready_oneshot().await
}

#[cfg(feature = "transport")]
pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
where
C: Service<Uri> + Send + 'static,
Expand Down
10 changes: 9 additions & 1 deletion tonic/src/transport/service/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::transport::{server::Connected, Certificate};
#[cfg(feature = "transport")]
use crate::transport::Certificate;
use crate::transport::Connected;
use hyper::client::connect::{Connected as HyperConnected, Connection};
use std::io;
#[cfg(feature = "transport")]
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -61,14 +64,17 @@ pub(in crate::transport) trait ConnectedIo: Io + Connected {}

impl<T> ConnectedIo for T where T: Io + Connected {}

#[cfg(feature = "transport")]
pub(crate) struct ServerIo(Pin<Box<dyn ConnectedIo>>);

#[cfg(feature = "transport")]
impl ServerIo {
pub(in crate::transport) fn new<I: ConnectedIo>(io: I) -> Self {
ServerIo(Box::pin(io))
}
}

#[cfg(feature = "transport")]
impl Connected for ServerIo {
fn remote_addr(&self) -> Option<SocketAddr> {
(&*self.0).remote_addr()
Expand All @@ -79,6 +85,7 @@ impl Connected for ServerIo {
}
}

#[cfg(feature = "transport")]
impl AsyncRead for ServerIo {
fn poll_read(
mut self: Pin<&mut Self>,
Expand All @@ -89,6 +96,7 @@ impl AsyncRead for ServerIo {
}
}

#[cfg(feature = "transport")]
impl AsyncWrite for ServerIo {
fn poll_write(
mut self: Pin<&mut Self>,
Expand Down
Loading

0 comments on commit 38817a9

Please sign in to comment.