Skip to content

Commit

Permalink
fix(tonic): flush accumulated ready messages when status received
Browse files Browse the repository at this point in the history
hyperium#1423 introduced logic to buffer multiple ready messages in order
to amortize the cost of sends to the underlying transport. This
also introduced a change in behavior for tonic in the following
scenario:

A stream of ready messages less than the yield threshold is
trailed by a status. Previously the ready messages would all
have been yielded from the stream and sent, followed by the
status. After the change was introduced the status is yielded
from the stream and sent but the accumulated ready messages in the
buffer are never sent out.

This change adjusts the logic to restore the previous behavior
while still retaining the amoritization benefits. Namely it
flushes the accumulated ready messages prior to yielding the status
ensuring they are sent out from the stream in the order they are read.
  • Loading branch information
jfoster-twilio committed Jun 25, 2024
1 parent f679dcf commit e299a35
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
49 changes: 49 additions & 0 deletions tests/integration_tests/tests/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,52 @@ async fn status_from_server_stream_with_source() {
let source = error.source().unwrap();
source.downcast_ref::<tonic::transport::Error>().unwrap();
}

#[tokio::test]
async fn message_and_then_status_from_server_stream() {
integration_tests::trace_init();

struct Svc;

#[tonic::async_trait]
impl test_stream_server::TestStream for Svc {
type StreamCallStream = Stream<OutputStream>;

async fn stream_call(
&self,
_: Request<InputStream>,
) -> Result<Response<Self::StreamCallStream>, Status> {
let s = tokio_stream::iter(vec![
Ok(OutputStream {}),
Err::<OutputStream, _>(Status::unavailable("foo")),
]);
Ok(Response::new(Box::pin(s) as Self::StreamCallStream))
}
}

let svc = test_stream_server::TestStreamServer::new(Svc);

tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve("127.0.0.1:1340".parse().unwrap())
.await
.unwrap();
});

tokio::time::sleep(Duration::from_millis(100)).await;

let mut client = test_stream_client::TestStreamClient::connect("http://127.0.0.1:1340")
.await
.unwrap();

let mut stream = client
.stream_call(InputStream {})
.await
.unwrap()
.into_inner();

assert_eq!(stream.message().await.unwrap(), Some(OutputStream {}));
assert_eq!(stream.message().await.unwrap_err().message(), "foo");
assert_eq!(stream.message().await.unwrap(), None);
}
13 changes: 12 additions & 1 deletion tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ where
max_message_size: Option<usize>,
buf: BytesMut,
uncompression_buf: BytesMut,
error: Option<Status>,
}

impl<T, U> EncodedBytes<T, U>
Expand Down Expand Up @@ -112,6 +113,7 @@ where
max_message_size,
buf,
uncompression_buf,
error: None,
}
}
}
Expand All @@ -131,9 +133,14 @@ where
max_message_size,
buf,
uncompression_buf,
error,
} = self.project();
let buffer_settings = encoder.buffer_settings();

if let Some(status) = error.take() {
return Poll::Ready(Some(Err(status)));
}

loop {
match source.as_mut().poll_next(cx) {
Poll::Pending if buf.is_empty() => {
Expand Down Expand Up @@ -163,7 +170,11 @@ where
}
}
Poll::Ready(Some(Err(status))) => {
return Poll::Ready(Some(Err(status)));
if buf.is_empty() {
return Poll::Ready(Some(Err(status)));
}
*error = Some(status);
return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
}
}
}
Expand Down

0 comments on commit e299a35

Please sign in to comment.