From fbc1d007aea498be97c21a3ff6c26bd1b87d05cb Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Thu, 8 Apr 2021 09:55:36 +0200 Subject: [PATCH 1/5] Reconnects --- src/remote_router.rs | 47 ++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/remote_router.rs b/src/remote_router.rs index 1803be1..28ee5ff 100644 --- a/src/remote_router.rs +++ b/src/remote_router.rs @@ -1,5 +1,6 @@ use actix::{prelude::*, WrapFuture}; use futures::{channel::oneshot, prelude::*, SinkExt}; +use std::ops::Not; use std::{collections::HashSet, time::Duration}; use crate::connection::ClientInfo; @@ -10,6 +11,7 @@ use crate::{ }; const CONNECT_TIMEOUT: Duration = Duration::from_secs(3); +const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(1); type RemoteConnection = ConnectionRef; @@ -25,14 +27,6 @@ impl Actor for RemoteRouter { fn started(&mut self, ctx: &mut Self::Context) { self.try_connect(ctx); - let _ = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { - if act.connection.is_none() { - act.clean_pending_calls( - Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), - ctx, - ); - } - }); } } @@ -41,8 +35,18 @@ impl RemoteRouter { // FIXME: this is `SystemService` and as such cannot get input being initialized // FIXME: but we need to pass gsb_url from yagnad CLI let addr = ya_sb_proto::GsbAddr::default(); - log::info!("trying to connect to: {}", addr); let client_info = self.client_info.clone(); + + log::info!("trying to connect to: {}", addr); + + let timeout_hdl = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { + if act.connection.is_none() { + act.clean_pending_calls( + Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), + ctx, + ); + } + }); let connect_fut = connection::transport(addr.clone()) .map_err(move |e| Error::ConnectionFail(addr, e)) .into_actor(self) @@ -65,9 +69,16 @@ impl RemoteRouter { .into_actor(act), ) }) - .then(|v: Result<(), Error>, _, _| { - if let Err(e) = v { - log::warn!("routing error: {}", e); + .then(move |result: Result<(), Error>, _, ctx| { + ctx.cancel_future(timeout_hdl); + match result { + Ok(_) => { + let _ = ctx.run_interval(CONNECTION_CHECK_INTERVAL, Self::check_connection); + } + Err(e) => { + log::warn!("routing error: {}", e); + ctx.stop(); + } } fut::ready(()) }); @@ -107,6 +118,12 @@ impl RemoteRouter { }) .right_future() } + + fn check_connection(&mut self, ctx: &mut ::Context) { + self.connection.as_ref().map(|c| { + c.connected().not().then(|| ctx.stop()); + }); + } } impl Default for RemoteRouter { @@ -123,11 +140,7 @@ impl Default for RemoteRouter { impl Supervised for RemoteRouter { fn restarting(&mut self, _ctx: &mut Self::Context) { if let Some(c) = self.connection.take() { - if c.connected() { - self.connection = Some(c) - } else { - log::error!("lost connection"); - } + c.connected().not().then(|| log::error!("lost connection")); } } } From 943a1e2a3ab61a67fd6673620d4ca8fe1d0d93ce Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Thu, 8 Apr 2021 13:48:16 +0200 Subject: [PATCH 2/5] Disconnect handler --- src/connection.rs | 16 +++++++++++++++- src/remote_router.rs | 39 +++++++++++++++++++++++---------------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 262c536..2dcf537 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -89,7 +89,17 @@ impl ResponseChunk { } #[derive(Default)] -pub struct LocalRouterHandler; +pub struct LocalRouterHandler { + disconnect_h: Option>, +} + +impl LocalRouterHandler { + pub fn new(disconnect_fn: F) -> Self { + Self { + disconnect_h: Some(Box::new(disconnect_fn)), + } + } +} impl CallRequestHandler for LocalRouterHandler { type Reply = Pin>>>; @@ -107,6 +117,10 @@ impl CallRequestHandler for LocalRouterHandler { .forward_bytes_local(&address, &caller, data.as_ref()) .boxed_local() } + + fn on_disconnect(&mut self) { + self.disconnect_h.take().map(|f| f()); + } } impl< diff --git a/src/remote_router.rs b/src/remote_router.rs index 28ee5ff..6593f67 100644 --- a/src/remote_router.rs +++ b/src/remote_router.rs @@ -11,7 +11,6 @@ use crate::{ }; const CONNECT_TIMEOUT: Duration = Duration::from_secs(3); -const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(1); type RemoteConnection = ConnectionRef; @@ -39,7 +38,7 @@ impl RemoteRouter { log::info!("trying to connect to: {}", addr); - let timeout_hdl = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { + let timeout_h = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { if act.connection.is_none() { act.clean_pending_calls( Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), @@ -55,7 +54,8 @@ impl RemoteRouter { Ok(v) => v, Err(e) => return fut::Either::Left(fut::err(e)), }; - let connection = connection::connect(client_info, transport); + let connection = + connection::connect_with_handler(client_info, transport, act.handler(ctx)); act.connection = Some(connection.clone()); act.clean_pending_calls(Ok(connection.clone()), ctx); fut::Either::Right( @@ -70,15 +70,10 @@ impl RemoteRouter { ) }) .then(move |result: Result<(), Error>, _, ctx| { - ctx.cancel_future(timeout_hdl); - match result { - Ok(_) => { - let _ = ctx.run_interval(CONNECTION_CHECK_INTERVAL, Self::check_connection); - } - Err(e) => { - log::warn!("routing error: {}", e); - ctx.stop(); - } + ctx.cancel_future(timeout_h); + if let Err(e) = result { + log::warn!("routing error: {}", e); + ctx.stop(); } fut::ready(()) }); @@ -119,10 +114,22 @@ impl RemoteRouter { .right_future() } - fn check_connection(&mut self, ctx: &mut ::Context) { - self.connection.as_ref().map(|c| { - c.connected().not().then(|| ctx.stop()); - }); + fn handler(&mut self, ctx: &mut ::Context) -> LocalRouterHandler { + let (tx, rx) = oneshot::channel(); + + rx.into_actor(self) + .map(|_, this, ctx| { + this.connection.as_ref().map(|c| { + c.connected().not().then(|| log::warn!("connection lost")); + }); + // restarts the actor + ctx.stop(); + }) + .spawn(ctx); + + LocalRouterHandler::new(|| { + let _ = tx.send(()); + }) } } From 357b8bc3873b5b1b370dea343a9b9f55dc81ebcf Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Thu, 8 Apr 2021 15:39:52 +0200 Subject: [PATCH 3/5] Remove duplicate log messages --- src/remote_router.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/remote_router.rs b/src/remote_router.rs index 6593f67..3092a2f 100644 --- a/src/remote_router.rs +++ b/src/remote_router.rs @@ -146,9 +146,7 @@ impl Default for RemoteRouter { impl Supervised for RemoteRouter { fn restarting(&mut self, _ctx: &mut Self::Context) { - if let Some(c) = self.connection.take() { - c.connected().not().then(|| log::error!("lost connection")); - } + let _ = self.connection.take(); } } From 6a769835d7a905798e2a0adfe4957ab9026cb318 Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Mon, 12 Apr 2021 16:03:22 +0200 Subject: [PATCH 4/5] Log and restart on connection timeout --- src/remote_router.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/remote_router.rs b/src/remote_router.rs index 3092a2f..0efef29 100644 --- a/src/remote_router.rs +++ b/src/remote_router.rs @@ -44,6 +44,8 @@ impl RemoteRouter { Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), ctx, ); + log::warn!("connection timed out after {:?}", CONNECT_TIMEOUT); + ctx.stop(); } }); let connect_fut = connection::transport(addr.clone()) From 6d0b60da429037566bd89b5c7d0c1378544766ba Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Mon, 12 Apr 2021 17:47:20 +0200 Subject: [PATCH 5/5] v0.4.4 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6aee76a..12d0a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "ya-service-bus" -version = "0.4.3" +version = "0.4.4" dependencies = [ "actix", "actix-rt", diff --git a/Cargo.toml b/Cargo.toml index e4ec7cf..f7b8753 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ya-service-bus" -version = "0.4.3" +version = "0.4.4" authors = ["Golem Factory "] edition = "2018" homepage = "https://github.com/golemfactory/ya-service-bus"