Skip to content

Commit

Permalink
fix(client): Use Stream instead of TrySteam for client calls (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
alce authored and LucioFranco committed Oct 10, 2019
1 parent 8cddf8a commit 7eda823
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 21 deletions.
4 changes: 2 additions & 2 deletions tonic-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn generate_client_streaming(method: &Method, proto: &str, path: String) -> Toke
quote! {
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
-> Result<tonic::Response<#response>, tonic::Status>
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
where S: Stream<Item = #request> + Send + 'static,
{
self.ready().await?;
let codec = tonic::codec::ProstCodec::new();
Expand All @@ -151,7 +151,7 @@ fn generate_streaming(method: &Method, proto: &str, path: String) -> TokenStream
quote! {
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
-> Result<tonic::Response<tonic::codec::Streaming<#response>>, tonic::Status>
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
where S: Stream<Item = #request> + Send + 'static,
{
self.ready().await?;
let codec = tonic::codec::ProstCodec::new();
Expand Down
2 changes: 1 addition & 1 deletion tonic-examples/src/routeguide/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("FEATURE = {:?}", response);

let outbound = async_stream::try_stream! {
let outbound = async_stream::stream! {
let mut interval = Interval::new_interval(Duration::from_secs(1));

while let Some(time) = interval.next().await {
Expand Down
19 changes: 7 additions & 12 deletions tonic-interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAsser
// }

pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
let requests = REQUEST_LENGTHS
.iter()
.map(|len| StreamingInputCallRequest {
payload: Some(crate::client_payload(*len as usize)),
..Default::default()
})
.map(|v| Ok(v));
let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
payload: Some(crate::client_payload(*len as usize)),
..Default::default()
});

let stream = stream::iter(requests);

Expand Down Expand Up @@ -154,9 +151,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
let (mut tx, rx) = mpsc::unbounded_channel();
tx.try_send(make_ping_pong_request(0)).unwrap();

let result = client
.full_duplex_call(Request::new(rx.map(|s| Ok(s))))
.await;
let result = client.full_duplex_call(Request::new(rx)).await;

assertions.push(test_assert!(
"call must be successful",
Expand Down Expand Up @@ -272,7 +267,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
let result = client.unary_call(Request::new(simple_req)).await;
validate_response(result, assertions);

let stream = stream::iter(vec![Ok(duplex_req)]);
let stream = stream::iter(vec![duplex_req]);
let result = match client.full_duplex_call(Request::new(stream)).await {
Ok(response) => {
let stream = response.into_inner();
Expand Down Expand Up @@ -359,7 +354,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
req_unary.metadata_mut().insert(key1, value1.clone());
req_unary.metadata_mut().insert_bin(key2, value2.clone());

let stream = stream::iter(vec![Ok(make_ping_pong_request(0))]);
let stream = stream::iter(vec![make_ping_pong_request(0)]);
let mut req_stream = Request::new(stream);
req_stream.metadata_mut().insert(key1, value1.clone());
req_stream.metadata_mut().insert_bin(key2, value2.clone());
Expand Down
8 changes: 4 additions & 4 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<T> Grpc<T> {
M1: Send + 'static,
M2: Send + 'static,
{
let request = request.map(|m| stream::once(future::ok(m)));
let request = request.map(|m| stream::once(future::ready(m)));
self.client_streaming(request, path, codec).await
}

Expand All @@ -81,7 +81,7 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
S: Stream<Item = Result<M1, Status>> + Send + 'static,
S: Stream<Item = M1> + Send + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<T> Grpc<T> {
M1: Send + 'static,
M2: Send + 'static,
{
let request = request.map(|m| stream::once(future::ok(m)));
let request = request.map(|m| stream::once(future::ready(m)));
self.streaming(request, path, codec).await
}

Expand All @@ -134,7 +134,7 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
S: Stream<Item = Result<M1, Status>> + Send + 'static,
S: Stream<Item = M1> + Send + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) fn encode_client<T, U>(
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
U: Stream<Item = T::Item>,
{
let stream = encode(encoder, source).into_stream();
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
EncodeBody::new_client(stream)
}

Expand Down

0 comments on commit 7eda823

Please sign in to comment.