Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(codec): Enforce encoders/decoders are Sync #84

Merged
merged 5 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tonic-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {

quote! {
#stream_doc
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;

#method_doc
async fn #name(&self, request: tonic::Request<#req_message>)
Expand All @@ -164,7 +164,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream {

quote! {
#stream_doc
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + 'static;
type #stream: Stream<Item = Result<#res_message, tonic::Status>> + Send + Sync + 'static;

#method_doc
async fn #name(&self, request: tonic::Request<tonic::Streaming<#req_message>>)
Expand Down
5 changes: 3 additions & 2 deletions tonic-examples/src/routeguide/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ impl server::RouteGuide for RouteGuide {
Ok(Response::new(summary))
}

type RouteChatStream = Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>;
type RouteChatStream =
Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>>;

async fn route_chat(
&self,
Expand Down Expand Up @@ -138,7 +139,7 @@ impl server::RouteGuide for RouteGuide {

Ok(Response::new(Box::pin(output)
as Pin<
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>,
Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>,
>))
}
}
Expand Down
5 changes: 3 additions & 2 deletions tonic-interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ pub struct TestService;

type Result<T> = std::result::Result<Response<T>, Status>;
type Streaming<T> = Request<tonic::Streaming<T>>;
type Stream<T> =
Pin<Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
type Stream<T> = Pin<
Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + Sync + 'static>,
>;

#[tonic::async_trait]
impl pb::server::TestService for TestService {
Expand Down
9 changes: 5 additions & 4 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ tls = []
name = "bench_main"
harness = false

[dev-dependencies]
rand = "0.7.2"
criterion = "0.3"

[dependencies]
bytes = "0.4"
futures-core-preview = "=0.3.0-alpha.19"
Expand Down Expand Up @@ -83,6 +79,11 @@ openssl1 = { package = "openssl", version = "0.10", optional = true }
# rustls
tokio-rustls = { version = "=0.12.0-alpha.4", optional = true }

[dev-dependencies]
static_assertions = "1.0"
rand = "0.7.2"
criterion = "0.3"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
10 changes: 5 additions & 5 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
pub(crate) type BytesBuf = <Bytes as IntoBuf>::Buf;

/// A trait alias for [`http_body::Body`].
pub trait Body: sealed::Sealed {
pub trait Body: sealed::Sealed + Send + Sync {
/// The body data type.
type Data: Buf;
/// The errors produced from the body.
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait Body: sealed::Sealed {

impl<T> Body for T
where
T: HttpBody,
T: HttpBody + Send + Sync + 'static,
T::Error: Into<Error>,
{
type Data = T::Data;
Expand Down Expand Up @@ -83,7 +83,7 @@ mod sealed {

/// A type erased http body.
pub struct BoxBody {
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + 'static>>,
inner: Pin<Box<dyn Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static>>,
}

struct MapBody<B>(B);
Expand All @@ -92,7 +92,7 @@ impl BoxBody {
/// Create a new `BoxBody` mapping item and error to the default types.
pub fn new<B>(inner: B) -> Self
where
B: Body<Data = BytesBuf, Error = Status> + Send + 'static,
B: Body<Data = BytesBuf, Error = Status> + Send + Sync + 'static,
{
BoxBody {
inner: Box::pin(inner),
Expand All @@ -102,7 +102,7 @@ impl BoxBody {
/// Create a new `BoxBody` mapping item and error to the default types.
pub fn map_from<B>(inner: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
{
Expand Down
20 changes: 10 additions & 10 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl<T> Grpc<T> {
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request.map(|m| stream::once(future::ready(m)));
self.client_streaming(request, path, codec).await
Expand All @@ -81,10 +81,10 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
S: Stream<Item = M1> + Send + 'static,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let (mut parts, body) = self.streaming(request, path, codec).await?.into_parts();

Expand Down Expand Up @@ -115,8 +115,8 @@ impl<T> Grpc<T> {
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request.map(|m| stream::once(future::ready(m)));
self.streaming(request, path, codec).await
Expand All @@ -134,10 +134,10 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
S: Stream<Item = M1> + Send + 'static,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let mut parts = Parts::default();
parts.path_and_query = Some(path);
Expand Down
21 changes: 12 additions & 9 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const BUFFER_SIZE: usize = 8 * 1024;
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
/// to fetch the message stream and trailing metadata
pub struct Streaming<T> {
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + Sync + 'static>,
body: BoxBody,
state: State,
direction: Direction,
Expand All @@ -45,40 +45,40 @@ enum Direction {
impl<T> Streaming<T> {
pub(crate) fn new_response<B, D>(decoder: D, body: B, status_code: StatusCode) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Response(status_code))
}

pub(crate) fn new_empty<B, D>(decoder: D, body: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::EmptyResponse)
}

pub(crate) fn new_request<B, D>(decoder: D, body: B) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Request)
}

fn new<B, D>(decoder: D, body: B, direction: Direction) -> Self
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + 'static,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self {
decoder: Box::new(decoder),
Expand Down Expand Up @@ -291,3 +291,6 @@ impl<T> fmt::Debug for Streaming<T> {
f.debug_struct("Streaming").finish()
}
}

#[cfg(test)]
static_assertions::assert_impl_all!(Streaming<()>: Send, Sync);
12 changes: 7 additions & 5 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ pub(crate) fn encode_server<T, U>(
source: U,
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = Result<T::Item, Status>> + Send + Sync + 'static,
{
let stream = encode(encoder, source).into_stream();
EncodeBody::new_server(stream)
Expand All @@ -28,8 +29,9 @@ pub(crate) fn encode_client<T, U>(
source: U,
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = T::Item>,
T: Encoder<Error = Status> + Send + Sync + 'static,
T::Item: Send + Sync,
U: Stream<Item = T::Item> + Send + Sync + 'static,
{
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
EncodeBody::new_client(stream)
Expand Down Expand Up @@ -88,7 +90,7 @@ pub(crate) struct EncodeBody<S> {

impl<S> EncodeBody<S>
where
S: Stream<Item = Result<crate::body::BytesBuf, Status>>,
S: Stream<Item = Result<crate::body::BytesBuf, Status>> + Send + Sync + 'static,
{
pub(crate) fn new_client(inner: S) -> Self {
Self {
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub trait Codec: Default {
type Decode: Send + 'static;

/// The encoder that can encode a message.
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + 'static;
type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + Sync + 'static;
/// The encoder that can decode a message.
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + 'static;
type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + Sync + 'static;

/// Fetch the encoder.
fn encoder(&mut self) -> Self::Encoder;
Expand Down
6 changes: 3 additions & 3 deletions tonic/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait IntoRequest<T>: sealed::Sealed {
/// ```
pub trait IntoStreamingRequest: sealed::Sealed {
/// The RPC request stream type
type Stream: Stream<Item = Self::Message> + Send + 'static;
type Stream: Stream<Item = Self::Message> + Send + Sync + 'static;

/// The RPC request type
type Message;
Expand Down Expand Up @@ -182,7 +182,7 @@ impl<T> IntoRequest<T> for Request<T> {

impl<T> IntoStreamingRequest for T
where
T: Stream + Send + 'static,
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
Expand All @@ -194,7 +194,7 @@ where

impl<T> IntoStreamingRequest for Request<T>
where
T: Stream + Send + 'static,
T: Stream + Send + Sync + 'static,
{
type Stream = T;
type Message = T::Item;
Expand Down
19 changes: 10 additions & 9 deletions tonic/src/server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Grpc<T> {
impl<T> Grpc<T>
where
T: Codec,
T::Encode: Sync,
{
/// Creates a new gRPC client with the provided [`Codec`].
pub fn new(codec: T) -> Self {
Expand All @@ -40,7 +41,7 @@ where
) -> http::Response<BoxBody>
where
S: UnaryService<T::Decode, Response = T::Encode>,
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand Down Expand Up @@ -70,8 +71,8 @@ where
) -> http::Response<BoxBody>
where
S: ServerStreamingService<T::Decode, Response = T::Encode>,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
S::ResponseStream: Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -95,7 +96,7 @@ where
) -> http::Response<BoxBody>
where
S: ClientStreamingService<T::Decode, Response = T::Encode>,
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send + 'static,
B::Error: Into<crate::Error> + Send + 'static,
{
Expand All @@ -115,8 +116,8 @@ where
) -> http::Response<BoxBody>
where
S: StreamingService<T::Decode, Response = T::Encode> + Send,
S::ResponseStream: Send + 'static,
B: Body + Send + 'static,
S::ResponseStream: Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -130,7 +131,7 @@ where
request: http::Request<B>,
) -> Result<Request<T::Decode>, Status>
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand Down Expand Up @@ -158,7 +159,7 @@ where
request: http::Request<B>,
) -> Request<Streaming<T::Decode>>
where
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Data: Into<Bytes> + Send,
B::Error: Into<crate::Error> + Send,
{
Expand All @@ -170,7 +171,7 @@ where
response: Result<crate::Response<B>, Status>,
) -> http::Response<BoxBody>
where
B: TryStream<Ok = T::Encode, Error = Status> + Send + 'static,
B: TryStream<Ok = T::Encode, Error = Status> + Send + Sync + 'static,
{
match response {
Ok(r) => {
Expand Down