From bce6958225b1045ec89fea6005f5788fd97f0222 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 4 Oct 2017 13:13:53 +0300 Subject: [PATCH 1/2] SecretStore: session level timeout --- .../src/key_server_cluster/cluster.rs | 27 +++++---- .../key_server_cluster/cluster_sessions.rs | 55 ++++++++++++++++++- .../src/key_server_cluster/message.rs | 2 + 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index b6107bd8d92..7e9592e9488 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -353,6 +353,7 @@ impl ClusterCore { /// Send keepalive messages to every othe node. fn keep_alive(data: Arc) { + data.sessions.sessions_keep_alive(); for connection in data.connections.active_connections() { let last_message_diff = time::Instant::now() - connection.last_message_time(); if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) { @@ -456,7 +457,7 @@ impl ClusterCore { } }, _ => { - data.sessions.generation_sessions.get(&session_id) + data.sessions.generation_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -534,7 +535,7 @@ impl ClusterCore { } }, _ => { - data.sessions.encryption_sessions.get(&session_id) + data.sessions.encryption_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -625,7 +626,7 @@ impl ClusterCore { } }, _ => { - data.sessions.decryption_sessions.get(&decryption_session_id) + data.sessions.decryption_sessions.get(&decryption_session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -701,7 +702,7 @@ impl ClusterCore { } }, _ => { - data.sessions.signing_sessions.get(&signing_session_id) + data.sessions.signing_sessions.get(&signing_session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -780,7 +781,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -861,7 +862,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -942,7 +943,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -1023,7 +1024,7 @@ impl ClusterCore { } }, _ => { - data.sessions.admin_sessions.get(&session_id) + data.sessions.admin_sessions.get(&session_id, true) .ok_or(Error::InvalidSessionId) }, }; @@ -1078,8 +1079,12 @@ impl ClusterCore { /// Process single cluster message from the connection. fn process_cluster_message(data: Arc, connection: Arc, message: ClusterMessage) { match message { - ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))), - ClusterMessage::KeepAliveResponse(_) => (), + ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse { + session_id: None, + })))), + ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id { + data.sessions.on_session_keep_alive(connection.node_id(), session_id.into()); + }, _ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()), } } @@ -1447,7 +1452,7 @@ impl ClusterClient for ClusterClientImpl { #[cfg(test)] fn generation_session(&self, session_id: &SessionId) -> Option> { - self.data.sessions.generation_sessions.get(session_id) + self.data.sessions.generation_sessions.get(session_id, false) } } diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index cfc00241d1a..d4eff2ece6c 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -47,6 +47,8 @@ use key_server_cluster::admin_sessions::ShareChangeSessionMeta; /// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores /// session messages. const SESSION_TIMEOUT_INTERVAL: u64 = 60; +/// Interval to send session-level KeepAlive-messages. +const SESSION_KEEP_ALIVE_INTERVAL: u64 = 30; lazy_static! { /// Servers set change session id (there could be at most 1 session => hardcoded id). @@ -127,6 +129,8 @@ pub struct QueuedSession { pub master: NodeId, /// Cluster view. pub cluster_view: Arc, + /// Last keep alive time. + pub last_keep_alive_time: time::Instant, /// Last received message time. pub last_message_time: time::Instant, /// Generation session. @@ -210,6 +214,18 @@ impl ClusterSessions { self.make_faulty_generation_sessions.store(true, Ordering::Relaxed); } + /// Send session-level keep-alive messages. + pub fn sessions_keep_alive(&self) { + self.admin_sessions.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id); + } + + /// When session-level keep-alive response is received. + pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) { + if session_id == *SERVERS_SET_CHANGE_SESSION_ID { + self.admin_sessions.on_keep_alive(&session_id, sender); + } + } + /// Create new generation session. pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option, cluster: Arc) -> Result, Error> { // check that there's no finished encryption session with the same id @@ -550,8 +566,15 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster self.sessions.read().is_empty() } - pub fn get(&self, session_id: &K) -> Option> { - self.sessions.read().get(session_id).map(|s| s.session.clone()) + pub fn get(&self, session_id: &K, update_last_message_time: bool) -> Option> { + let mut sessions = self.sessions.write(); + sessions.get_mut(session_id) + .map(|s| { + if update_last_message_time { + s.last_message_time = time::Instant::now(); + } + s.session.clone() + }) } pub fn insert Result>(&self, master: NodeId, session_id: K, cluster: Arc, session: F) -> Result, Error> { @@ -564,6 +587,7 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster let queued_session = QueuedSession { master: master, cluster_view: cluster, + last_keep_alive_time: time::Instant::now(), last_message_time: time::Instant::now(), session: session.clone(), queue: VecDeque::new(), @@ -621,6 +645,33 @@ impl ClusterSessionsContainer where K: Clone + Ord, V: Cluster } } +impl ClusterSessionsContainer where K: Clone + Ord, V: ClusterSession, SessionId: From { + pub fn send_keep_alive(&self, session_id: &K, self_node_id: &NodeId) { + if let Some(session) = self.sessions.write().get_mut(session_id) { + let now = time::Instant::now(); + if self_node_id == &session.master && now - session.last_keep_alive_time > time::Duration::from_secs(SESSION_KEEP_ALIVE_INTERVAL) { + session.last_keep_alive_time = now; + // since we send KeepAlive message to prevent nodes from disconnecting + // && worst thing that can happen if node is disconnected is that session is failed + // => ignore error here, because probably this node is not need for the rest of the session at all + let _ = session.cluster_view.broadcast(Message::Cluster(message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse { + session_id: Some(session_id.clone().into()), + }))); + } + } + } + + pub fn on_keep_alive(&self, session_id: &K, sender: &NodeId) { + if let Some(session) = self.sessions.write().get_mut(session_id) { + let now = time::Instant::now(); + // we only accept keep alive from master node of ServersSetChange session + if sender == &session.master { + session.last_keep_alive_time = now; + } + } + } +} + impl AdminSession { pub fn as_share_add(&self) -> Option<&ShareAddSessionImpl> { match *self { diff --git a/secret_store/src/key_server_cluster/message.rs b/secret_store/src/key_server_cluster/message.rs index 527cf3c2a04..30e7a7302e0 100644 --- a/secret_store/src/key_server_cluster/message.rs +++ b/secret_store/src/key_server_cluster/message.rs @@ -255,6 +255,8 @@ pub struct KeepAlive { /// Confirm that the node is still alive. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct KeepAliveResponse { + /// Session id, if used for session-level keep alive. + pub session_id: Option, } /// Initialize new DKG session. From acf59adf71a63f6bac80b806e2933df7cd63d196 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 4 Oct 2017 14:16:59 +0300 Subject: [PATCH 2/2] removed obsolete TODO --- secret_store/src/key_server_cluster/cluster_sessions.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index d4eff2ece6c..08a8d69122a 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -510,9 +510,6 @@ impl ClusterSessions { self.encryption_sessions.stop_stalled_sessions(); self.decryption_sessions.stop_stalled_sessions(); self.signing_sessions.stop_stalled_sessions(); - // TODO: servers set change session could take a lot of time - // && during that session some nodes could not receive messages - // => they could stop session as stalled. This must be handled self.admin_sessions.stop_stalled_sessions(); }