diff --git a/Cargo.lock b/Cargo.lock index 6aee76a..12d0a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "ya-service-bus" -version = "0.4.3" +version = "0.4.4" dependencies = [ "actix", "actix-rt", diff --git a/Cargo.toml b/Cargo.toml index e4ec7cf..f7b8753 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ya-service-bus" -version = "0.4.3" +version = "0.4.4" authors = ["Golem Factory "] edition = "2018" homepage = "https://github.com/golemfactory/ya-service-bus" diff --git a/src/connection.rs b/src/connection.rs index 262c536..2dcf537 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -89,7 +89,17 @@ impl ResponseChunk { } #[derive(Default)] -pub struct LocalRouterHandler; +pub struct LocalRouterHandler { + disconnect_h: Option>, +} + +impl LocalRouterHandler { + pub fn new(disconnect_fn: F) -> Self { + Self { + disconnect_h: Some(Box::new(disconnect_fn)), + } + } +} impl CallRequestHandler for LocalRouterHandler { type Reply = Pin>>>; @@ -107,6 +117,10 @@ impl CallRequestHandler for LocalRouterHandler { .forward_bytes_local(&address, &caller, data.as_ref()) .boxed_local() } + + fn on_disconnect(&mut self) { + self.disconnect_h.take().map(|f| f()); + } } impl< diff --git a/src/remote_router.rs b/src/remote_router.rs index 1803be1..0efef29 100644 --- a/src/remote_router.rs +++ b/src/remote_router.rs @@ -1,5 +1,6 @@ use actix::{prelude::*, WrapFuture}; use futures::{channel::oneshot, prelude::*, SinkExt}; +use std::ops::Not; use std::{collections::HashSet, time::Duration}; use crate::connection::ClientInfo; @@ -25,14 +26,6 @@ impl Actor for RemoteRouter { fn started(&mut self, ctx: &mut Self::Context) { self.try_connect(ctx); - let _ = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { - if act.connection.is_none() { - act.clean_pending_calls( - Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), - ctx, - ); - } - }); } } @@ -41,8 +34,20 @@ impl RemoteRouter { // FIXME: this is `SystemService` and as such cannot get input being initialized // FIXME: but we need to pass gsb_url from yagnad CLI let addr = ya_sb_proto::GsbAddr::default(); - log::info!("trying to connect to: {}", addr); let client_info = self.client_info.clone(); + + log::info!("trying to connect to: {}", addr); + + let timeout_h = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| { + if act.connection.is_none() { + act.clean_pending_calls( + Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())), + ctx, + ); + log::warn!("connection timed out after {:?}", CONNECT_TIMEOUT); + ctx.stop(); + } + }); let connect_fut = connection::transport(addr.clone()) .map_err(move |e| Error::ConnectionFail(addr, e)) .into_actor(self) @@ -51,7 +56,8 @@ impl RemoteRouter { Ok(v) => v, Err(e) => return fut::Either::Left(fut::err(e)), }; - let connection = connection::connect(client_info, transport); + let connection = + connection::connect_with_handler(client_info, transport, act.handler(ctx)); act.connection = Some(connection.clone()); act.clean_pending_calls(Ok(connection.clone()), ctx); fut::Either::Right( @@ -65,9 +71,11 @@ impl RemoteRouter { .into_actor(act), ) }) - .then(|v: Result<(), Error>, _, _| { - if let Err(e) = v { + .then(move |result: Result<(), Error>, _, ctx| { + ctx.cancel_future(timeout_h); + if let Err(e) = result { log::warn!("routing error: {}", e); + ctx.stop(); } fut::ready(()) }); @@ -107,6 +115,24 @@ impl RemoteRouter { }) .right_future() } + + fn handler(&mut self, ctx: &mut ::Context) -> LocalRouterHandler { + let (tx, rx) = oneshot::channel(); + + rx.into_actor(self) + .map(|_, this, ctx| { + this.connection.as_ref().map(|c| { + c.connected().not().then(|| log::warn!("connection lost")); + }); + // restarts the actor + ctx.stop(); + }) + .spawn(ctx); + + LocalRouterHandler::new(|| { + let _ = tx.send(()); + }) + } } impl Default for RemoteRouter { @@ -122,13 +148,7 @@ impl Default for RemoteRouter { impl Supervised for RemoteRouter { fn restarting(&mut self, _ctx: &mut Self::Context) { - if let Some(c) = self.connection.take() { - if c.connected() { - self.connection = Some(c) - } else { - log::error!("lost connection"); - } - } + let _ = self.connection.take(); } }