diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 00ea03aca77..acfc116ac0a 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -357,6 +357,7 @@ impl ClusterCore { /// Send keepalive messages to every othe node. fn keep_alive(data: Arc) { + data.sessions.sessions_keep_alive(); for connection in data.connections.active_connections() { let last_message_diff = time::Instant::now() - connection.last_message_time(); if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) { @@ -460,7 +461,7 @@ impl ClusterCore { } }, _ => { - data.sessions.generation_sessions.get(&session_id) + data.sessions.generation_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -538,7 +539,7 @@ impl ClusterCore { } }, _ => { - data.sessions.encryption_sessions.get(&session_id) + data.sessions.encryption_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -629,7 +630,7 @@ impl ClusterCore { } }, _ => { - data.sessions.decryption_sessions.get(&decryption_session_id) + data.sessions.decryption_sessions.get(&decryption_session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -705,7 +706,7 @@ impl ClusterCore { } }, _ => { - data.sessions.signing_sessions.get(&signing_session_id) + data.sessions.signing_sessions.get(&signing_session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -784,7 +785,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -865,7 +866,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -946,7 +947,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -1029,7 +1030,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -1084,8 +1085,12 @@ impl ClusterCore { /// Process single cluster message from the connection. fn process_cluster_message(data: Arc, connection: Arc, message: ClusterMessage) { match message { - ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))), - ClusterMessage::KeepAliveResponse(_) => (), + ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse { + session_id: None, + })))), + ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id { + data.sessions.on_session_keep_alive(connection.node_id(), session_id.into()); + }, _ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()), } } @@ -1459,7 +1464,7 @@ impl ClusterClient for ClusterClientImpl { #[cfg(test)] fn generation_session(&self, session_id: &SessionId) -> Option> { - self.data.sessions.generation_sessions.get(session_id) + self.data.sessions.generation_sessions.get(session_id, false) } } diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index e2f7b062192..f22b6a7623e 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -47,6 +47,8 @@ use key_server_cluster::admin_sessions::ShareChangeSessionMeta; /// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores /// session messages. const SESSION_TIMEOUT_INTERVAL: u64 = 60; +/// Interval to send session-level KeepAlive-messages. +const SESSION_KEEP_ALIVE_INTERVAL: u64 = 30; lazy_static! { /// Servers set change session id (there could be at most 1 session => hardcoded id). @@ -129,6 +131,8 @@ pub struct QueuedSession { pub master: NodeId, /// Cluster view. pub cluster_view: Arc, + /// Last keep alive time. + pub last_keep_alive_time: time::Instant, /// Last received message time. pub last_message_time: time::Instant, /// Generation session. @@ -224,6 +228,18 @@ impl ClusterSessions { self.make_faulty_generation_sessions.store(true, Ordering::Relaxed); } + /// Send session-level keep-alive messages. + pub fn sessions_keep_alive(&self) { + self.admin_sessions.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id); + } + + /// When session-level keep-alive response is received. + pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) { + if session_id == *SERVERS_SET_CHANGE_SESSION_ID { + self.admin_sessions.on_keep_alive(&session_id, sender); + } + } + /// Create new generation session. pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { // check that there's no finished encryption session with the same id @@ -514,9 +530,6 @@ impl ClusterSessions { self.encryption_sessions.stop_stalled_sessions(); self.decryption_sessions.stop_stalled_sessions(); self.signing_sessions.stop_stalled_sessions(); - // TODO: servers set change session could take a lot of time - // && during that session some nodes could not receive messages - // => they could stop session as stalled. This must be handled self.admin_sessions.stop_stalled_sessions(); } @@ -571,8 +584,15 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster self.sessions.read().is_empty() } - pub fn get(&self, session_id: &K) -> Option> { - self.sessions.read().get(session_id).map(|s| s.session.clone()) + pub fn get(&self, session_id: &K, update_last_message_time: bool) -> Option> { + let mut sessions = self.sessions.write(); + sessions.get_mut(session_id) + .map(|s| { + if update_last_message_time { + s.last_message_time = time::Instant::now(); + } + s.session.clone() + }) } pub fn insert Result>(&self, master: NodeId, session_id: K, cluster: Arc, is_exclusive_session: bool, session: F) -> Result, Error> { @@ -590,6 +610,7 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster let queued_session = QueuedSession { master: master, cluster_view: cluster, + last_keep_alive_time: time::Instant::now(), last_message_time: time::Instant::now(), session: session.clone(), queue: VecDeque::new(), @@ -649,6 +670,33 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster } } +impl ClusterSessionsContainer where K: Clone + Ord, V: ClusterSession, SessionId: From { + pub fn send_keep_alive(&self, session_id: &K, self_node_id: &NodeId) { + if let Some(session) = self.sessions.write().get_mut(session_id) { + let now = time::Instant::now(); + if self_node_id == &session.master && now - session.last_keep_alive_time > time::Duration::from_secs(SESSION_KEEP_ALIVE_INTERVAL) { + session.last_keep_alive_time = now; + // since we send KeepAlive message to prevent nodes from disconnecting + // && worst thing that can happen if node is disconnected is that session is failed + // => ignore error here, because probably this node is not need for the rest of the session at all + let _ = session.cluster_view.broadcast(Message::Cluster(message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse { + session_id: Some(session_id.clone().into()), + }))); + } + } + } + + pub fn on_keep_alive(&self, session_id: &K, sender: &NodeId) { + if let Some(session) = self.sessions.write().get_mut(session_id) { + let now = time::Instant::now(); + // we only accept keep alive from master node of ServersSetChange session + if sender == &session.master { + session.last_keep_alive_time = now; + } + } + } +} + impl ClusterSessionsContainerState { /// When session is starting. pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> { diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index ce588313bee..8738520275d 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -255,6 +255,8 @@ pub struct KeepAlive { /// Confirm that the node is still alive. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct KeepAliveResponse { + /// Session id, if used for session-level keep alive. + pub session_id: Option, } /// Initialize new DKG session.