Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect to local router #21

Merged
merged 6 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ya-service-bus"
version = "0.4.3"
version = "0.4.4"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"
homepage = "https://github.com/golemfactory/ya-service-bus"
Expand Down
16 changes: 15 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,17 @@ impl ResponseChunk {
}

#[derive(Default)]
pub struct LocalRouterHandler;
pub struct LocalRouterHandler {
disconnect_h: Option<Box<dyn FnOnce()>>,
}

impl LocalRouterHandler {
pub fn new<F: FnOnce() + 'static>(disconnect_fn: F) -> Self {
Self {
disconnect_h: Some(Box::new(disconnect_fn)),
}
}
}

impl CallRequestHandler for LocalRouterHandler {
type Reply = Pin<Box<dyn futures::Stream<Item = Result<ResponseChunk, Error>>>>;
Expand All @@ -107,6 +117,10 @@ impl CallRequestHandler for LocalRouterHandler {
.forward_bytes_local(&address, &caller, data.as_ref())
.boxed_local()
}

fn on_disconnect(&mut self) {
self.disconnect_h.take().map(|f| f());
}
}

impl<
Expand Down
58 changes: 39 additions & 19 deletions src/remote_router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix::{prelude::*, WrapFuture};
use futures::{channel::oneshot, prelude::*, SinkExt};
use std::ops::Not;
use std::{collections::HashSet, time::Duration};

use crate::connection::ClientInfo;
Expand All @@ -25,14 +26,6 @@ impl Actor for RemoteRouter {

fn started(&mut self, ctx: &mut Self::Context) {
self.try_connect(ctx);
let _ = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| {
if act.connection.is_none() {
act.clean_pending_calls(
Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())),
ctx,
);
}
});
}
}

Expand All @@ -41,8 +34,20 @@ impl RemoteRouter {
// FIXME: this is `SystemService` and as such cannot get input being initialized
// FIXME: but we need to pass gsb_url from yagnad CLI
let addr = ya_sb_proto::GsbAddr::default();
log::info!("trying to connect to: {}", addr);
let client_info = self.client_info.clone();

log::info!("trying to connect to: {}", addr);

let timeout_h = ctx.run_later(CONNECT_TIMEOUT, |act, ctx| {
if act.connection.is_none() {
act.clean_pending_calls(
Err(ConnectionTimeout(ya_sb_proto::GsbAddr::default())),
ctx,
);
log::warn!("connection timed out after {:?}", CONNECT_TIMEOUT);
ctx.stop();
}
});
let connect_fut = connection::transport(addr.clone())
.map_err(move |e| Error::ConnectionFail(addr, e))
.into_actor(self)
Expand All @@ -51,7 +56,8 @@ impl RemoteRouter {
Ok(v) => v,
Err(e) => return fut::Either::Left(fut::err(e)),
};
let connection = connection::connect(client_info, transport);
let connection =
connection::connect_with_handler(client_info, transport, act.handler(ctx));
act.connection = Some(connection.clone());
act.clean_pending_calls(Ok(connection.clone()), ctx);
fut::Either::Right(
Expand All @@ -65,9 +71,11 @@ impl RemoteRouter {
.into_actor(act),
)
})
.then(|v: Result<(), Error>, _, _| {
if let Err(e) = v {
.then(move |result: Result<(), Error>, _, ctx| {
ctx.cancel_future(timeout_h);
if let Err(e) = result {
log::warn!("routing error: {}", e);
ctx.stop();
}
fut::ready(())
});
Expand Down Expand Up @@ -107,6 +115,24 @@ impl RemoteRouter {
})
.right_future()
}

fn handler(&mut self, ctx: &mut <Self as Actor>::Context) -> LocalRouterHandler {
let (tx, rx) = oneshot::channel();

rx.into_actor(self)
.map(|_, this, ctx| {
this.connection.as_ref().map(|c| {
c.connected().not().then(|| log::warn!("connection lost"));
});
// restarts the actor
ctx.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably needs some delay before restart, because it causes market test suite to hang using 100% CPU trying to reconnect in the infinite loop. I've used workaround, but it is dirty
golemfactory/yagna@1a1239a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reesolved by #23

})
.spawn(ctx);

LocalRouterHandler::new(|| {
let _ = tx.send(());
})
}
}

impl Default for RemoteRouter {
Expand All @@ -122,13 +148,7 @@ impl Default for RemoteRouter {

impl Supervised for RemoteRouter {
fn restarting(&mut self, _ctx: &mut Self::Context) {
if let Some(c) = self.connection.take() {
if c.connected() {
self.connection = Some(c)
} else {
log::error!("lost connection");
}
}
let _ = self.connection.take();
}
}

Expand Down