Skip to content

Commit

Permalink
fix(transport): reconnect lazy connections after first failure (#458)
Browse files Browse the repository at this point in the history
* fix(transport): reconnect lazy connections after first failure

Channels created with lazy connections never try to reconnect if the
first connection attempt fails. This is because `Reconnect` returns
`Poll::Ready(Err)` on poll_ready and the service is considered dead.

This change passes a flag to Reconnect to signal if the connection
is intended to be lazy, in which case reconnect returns the error on
the next call.

fixes #452
  • Loading branch information
alce committed Sep 23, 2020
1 parent cea990b commit e9910d1
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 26 deletions.
63 changes: 50 additions & 13 deletions tests/integration_tests/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,22 @@ use integration_tests::pb::{test_client::TestClient, test_server, Input, Output}
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::oneshot;
use tonic::{transport::Server, Request, Response, Status};
use tonic::{
transport::{Endpoint, Server},
Request, Response, Status,
};

struct Svc(Arc<Mutex<Option<oneshot::Sender<()>>>>);

#[tonic::async_trait]
impl test_server::Test for Svc {
async fn unary_call(&self, _: Request<Input>) -> Result<Response<Output>, Status> {
let mut l = self.0.lock().unwrap();
l.take().unwrap().send(()).unwrap();

Ok(Response::new(Output {}))
}
}

#[tokio::test]
async fn connect_returns_err() {
Expand All @@ -14,18 +29,6 @@ async fn connect_returns_err() {

#[tokio::test]
async fn connect_returns_err_via_call_after_connected() {
struct Svc(Arc<Mutex<Option<oneshot::Sender<()>>>>);

#[tonic::async_trait]
impl test_server::Test for Svc {
async fn unary_call(&self, _: Request<Input>) -> Result<Response<Output>, Status> {
let mut l = self.0.lock().unwrap();
l.take().unwrap().send(()).unwrap();

Ok(Response::new(Output {}))
}
}

let (tx, rx) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(tx)));
let svc = test_server::TestServer::new(Svc(sender));
Expand Down Expand Up @@ -53,3 +56,37 @@ async fn connect_returns_err_via_call_after_connected() {

jh.await.unwrap();
}

#[tokio::test]
async fn connect_lazy_reconnects_after_first_failure() {
let (tx, rx) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(tx)));
let svc = test_server::TestServer::new(Svc(sender));

let channel = Endpoint::from_static("http://127.0.0.1:1339")
.connect_lazy()
.unwrap();

let mut client = TestClient::new(channel);

// First call should fail, the server is not running
client.unary_call(Request::new(Input {})).await.unwrap_err();

// Start the server now, second call should succeed
let jh = tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_shutdown("127.0.0.1:1339".parse().unwrap(), rx.map(drop))
.await
.unwrap();
});

tokio::time::delay_for(Duration::from_millis(100)).await;
client.unary_call(Request::new(Input {})).await.unwrap();

// The server shut down, third call should fail
tokio::time::delay_for(Duration::from_millis(100)).await;
client.unary_call(Request::new(Input {})).await.unwrap_err();

jh.await.unwrap();
}
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Endpoint {
#[cfg(not(feature = "tls"))]
let connector = service::connector(http);

Channel::new(connector, self.clone())
Ok(Channel::new(connector, self.clone()))
}

/// Connect with a custom connector.
Expand Down
6 changes: 3 additions & 3 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Channel {
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
Expand All @@ -139,10 +139,10 @@ impl Channel {
{
let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

let svc = Connection::new(connector, endpoint).map_err(super::Error::from_source)?;
let svc = Connection::lazy(connector, endpoint);
let svc = Buffer::new(Either::A(svc), buffer_size);

Ok(Channel { svc })
Channel { svc }
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
Expand Down
20 changes: 15 additions & 5 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) struct Connection {
}

impl Connection {
pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
Expand Down Expand Up @@ -61,13 +61,13 @@ impl Connection {
.into_inner();

let connector = HyperConnect::new(connector, settings);
let conn = Reconnect::new(connector, endpoint.uri.clone());
let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy);

let inner = stack.layer(conn);

Ok(Self {
Self {
inner: BoxService::new(inner),
})
}
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
Expand All @@ -77,7 +77,17 @@ impl Connection {
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
Self::new(connector, endpoint)?.ready_oneshot().await
Self::new(connector, endpoint, false).ready_oneshot().await
}

pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
where
C: Service<Uri> + Send + 'static,
C::Error: Into<crate::Error> + Send,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
{
Self::new(connector, endpoint, true)
}
}

Expand Down
10 changes: 6 additions & 4 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ where
target: Target,
error: Option<M::Error>,
has_been_connected: bool,
is_lazy: bool,
}

#[derive(Debug)]
Expand All @@ -32,13 +33,14 @@ impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
{
pub(crate) fn new(mk_service: M, target: Target) -> Self {
pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
Reconnect {
mk_service,
state: State::Idle,
target,
error: None,
has_been_connected: false,
is_lazy,
}
}
}
Expand Down Expand Up @@ -89,11 +91,11 @@ where

state = State::Idle;

if self.has_been_connected {
if !(self.has_been_connected || self.is_lazy) {
return Poll::Ready(Err(e.into()));
} else {
self.error = Some(e.into());
break;
} else {
return Poll::Ready(Err(e.into()));
}
}
}
Expand Down

0 comments on commit e9910d1

Please sign in to comment.