Skip to content
This repository has been archived by the owner on Feb 4, 2021. It is now read-only.

Commit

Permalink
fix(codec): Enforce encoders/decoders are Sync (hyperium#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco authored and rabbitinspace committed Jan 1, 2020
1 parent 18c6aa3 commit c7096a9
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 53 deletions.
4 changes: 2 additions & 2 deletions tonic-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {

quote! {
#stream_doc
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;

#method_doc
async fn #name(&self, request: tonic::Request<#req_message>)
Expand All @@ -142,7 +142,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {

quote! {
#stream_doc
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;

#method_doc
async fn #name(&self, request: tonic::Request<tonic::Streaming<#req_message>>)
Expand Down
5 changes: 3 additions & 2 deletions tonic-examples/src/routeguide/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ impl server::RouteGuide for RouteGuide {
Ok(Response::new(summary))
}

type RouteChatStream = Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>;
type RouteChatStream =
Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>>;

async fn route_chat(
&self,
Expand Down Expand Up @@ -138,7 +139,7 @@ impl server::RouteGuide for RouteGuide {

Ok(Response::new(Box::pin(output)
as Pin<
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>,
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>,
>))
}
}
Expand Down
5 changes: 3 additions & 2 deletions tonic-interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ pub struct TestService;

type Result<T> = std::result::Result<Response<T>, Status>;
type Streaming<T> = Request<tonic::Streaming<T>>;
type Stream<T> =
Pin<Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
type Stream<T> = Pin<
Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + Sync + 'static>,
>;

#[tonic::async_trait]
impl pb::server::TestService for TestService {
Expand Down
9 changes: 5 additions & 4 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ tls = []
name = "bench_main"
harness = false

[dev-dependencies]
rand = "0.7.2"
criterion = "0.3"

[dependencies]
bytes = "0.4"
futures-core-preview = "=0.3.0-alpha.19"
Expand Down Expand Up @@ -83,6 +79,11 @@ openssl1 = { package = "openssl", version = "0.10", optional = true }
# rustls
tokio-rustls = { version = "=0.12.0-alpha.4", optional = true }

[dev-dependencies]
static_assertions = "1.0"
rand = "0.7.2"
criterion = "0.3"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
10 changes: 5 additions & 5 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
pub(crate) type BytesBuf = <Bytes as IntoBuf>::Buf;

/// A trait alias for [`http_body::Body`].
pub trait Body: sealed::Sealed {
pub trait Body: sealed::Sealed + Send + Sync {
/// The body data type.
type Data: Buf;
/// The errors produced from the body.
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait Body: sealed::Sealed {

impl<T> Body for T
where
T: HttpBody,
T: HttpBody + Send + Sync + 'static,
T::Error: Into<Error>,
{
type Data = T::Data;
Expand Down Expand Up @@ -83,7 +83,7 @@ mod sealed {

/// A type erased http body.
pub struct BoxBody {
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + 'static>>,
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static>>,
}

struct MapBody<B>(B);
Expand All @@ -92,7 +92,7 @@ impl BoxBody {
/// Create a new `BoxBody` mapping item and error to the default types.
pub fn new<B>(inner: B) -> Self
where
B: Body<Data = BytesBuf, Error = Status> + Send + 'static,
B: Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static,
{
BoxBody {
inner: Box::pin(inner),
Expand All @@ -102,7 +102,7 @@ impl BoxBody {
/// Create a new `BoxBody` mapping item and error to the default types.
pub fn map_from<B>(inner: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
{
Expand Down
20 changes: 10 additions & 10 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl<T> Grpc<T> {
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request.map(|m| stream::once(future::ready(m)));
self.client_streaming(request, path, codec).await
Expand All @@ -81,10 +81,10 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
S: Stream<Item = M1> + Send + 'static,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let (mut parts, body) = self.streaming(request, path, codec).await?.into_parts();

Expand Down Expand Up @@ -115,8 +115,8 @@ impl<T> Grpc<T> {
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request.map(|m| stream::once(future::ready(m)));
self.streaming(request, path, codec).await
Expand All @@ -134,10 +134,10 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
S: Stream<Item = M1> + Send + 'static,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let mut parts = Parts::default();
parts.path_and_query = Some(path);
Expand Down
21 changes: 12 additions & 9 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const BUFFER_SIZE: usize = 8 * 1024;
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
/// to fetch the message stream and trailing metadata
pub struct Streaming<T> {
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + Sync + 'static>,
body: BoxBody,
state: State,
direction: Direction,
Expand All @@ -45,40 +45,40 @@ enum Direction {
impl<T> Streaming<T> {
pub(crate) fn new_response<B, D>(decoder: D, body: B, status_code: StatusCode) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Response(status_code))
}

pub(crate) fn new_empty<B, D>(decoder: D, body: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::EmptyResponse)
}

pub(crate) fn new_request<B, D>(decoder: D, body: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Request)
}

fn new<B, D>(decoder: D, body: B, direction: Direction) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self {
decoder: Box::new(decoder),
Expand Down Expand Up @@ -291,3 +291,6 @@ impl<T> fmt::Debug for Streaming<T> {
f.debug_struct("Streaming").finish()
}
}

#[cfg(test)]
static_assertions::assert_impl_all!(Streaming<()>: Send, Sync);
12 changes: 7 additions & 5 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ pub(crate) fn encode_server<T, U>(
source: U,
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = Result<T::Item, Status>> + Send + Sync + 'static,
{
let stream = encode(encoder, source).into_stream();
EncodeBody::new_server(stream)
Expand All @@ -28,8 +29,9 @@ pub(crate) fn encode_client<T, U>(
source: U,
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = T::Item>,
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = T::Item> + Send + Sync + 'static,
{
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
EncodeBody::new_client(stream)
Expand Down Expand Up @@ -88,7 +90,7 @@ pub(crate) struct EncodeBody<S> {

impl<S> EncodeBody<S>
where
S: Stream<Item = Result<crate::body::BytesBuf, Status>>,
S: Stream<Item = Result<crate::body::BytesBuf, Status>> + Send + Sync + 'static,
{
pub(crate) fn new_client(inner: S) -> Self {
Self {
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub trait Codec: Default {
type Decode: Send + 'static;

/// The encoder that can encode a message.
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + 'static;
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + Sync + 'static;
/// The encoder that can decode a message.
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + 'static;
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + Sync + 'static;

/// Fetch the encoder.
fn encoder(&mut self) -> Self::Encoder;
Expand Down
6 changes: 3 additions & 3 deletions tonic/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait IntoRequest<T>: sealed::Sealed {
/// ```
pub trait IntoStreamingRequest: sealed::Sealed {
/// The RPC request stream type
type Stream: Stream<Item = Self::Message> + Send + 'static;
type Stream: Stream<Item = Self::Message> + Send + Sync + 'static;

/// The RPC request type
type Message;
Expand Down Expand Up @@ -182,7 +182,7 @@ impl<T> IntoRequest<T> for Request<T> {

impl<T> IntoStreamingRequest for T
where
T: Stream + Send + 'static,
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
Expand All @@ -194,7 +194,7 @@ where

impl<T> IntoStreamingRequest for Request<T>
where
T: Stream + Send + 'static,
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
Expand Down
19 changes: 10 additions & 9 deletions tonic/src/server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Grpc<T> {
impl<T> Grpc<T>
where
T: Codec,
T::Encode: Sync,
{
/// Creates a new gRPC client with the provided [`Codec`].
pub fn new(codec: T) -> Self {
Expand All @@ -40,7 +41,7 @@ where
) -> http::Response<BoxBody>
where
S: UnaryService<T::Decode, Response = T::Encode>,
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand Down Expand Up @@ -70,8 +71,8 @@ where
) -> http::Response<BoxBody>
where
S: ServerStreamingService<T::Decode, Response = T::Encode>,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
S::ResponseStream: Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -95,7 +96,7 @@ where
) -> http::Response<BoxBody>
where
S: ClientStreamingService<T::Decode, Response = T::Encode>,
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send + 'static,
B::Error: Into<crate::Error> + Send + 'static,
{
Expand All @@ -115,8 +116,8 @@ where
) -> http::Response<BoxBody>
where
S: StreamingService<T::Decode, Response = T::Encode> + Send,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
S::ResponseStream: Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -130,7 +131,7 @@ where
request: http::Request<B>,
) -> Result<Request<T::Decode>, Status>
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand Down Expand Up @@ -158,7 +159,7 @@ where
request: http::Request<B>,
) -> Request<Streaming<T::Decode>>
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -170,7 +171,7 @@ where
response: Result<crate::Response<B>, Status>,
) -> http::Response<BoxBody>
where
B: TryStream<Ok = T::Encode, Error = Status> + Send + 'static,
B: TryStream<Ok = T::Encode, Error = Status> + Send + Sync + 'static,
{
match response {
Ok(r) => {
Expand Down

0 comments on commit c7096a9

Please sign in to comment.