Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
SecretStore: session level timeout (#6631)
Browse files Browse the repository at this point in the history
* SecretStore: session level timeout

* removed obsolete TODO
  • Loading branch information
svyatonik authored and NikVolf committed Oct 5, 2017
1 parent 6431459 commit 4e9d439
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
27 changes: 16 additions & 11 deletions secret_store/src/key_server_cluster/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl ClusterCore {

/// Send keepalive messages to every othe node.
fn keep_alive(data: Arc<ClusterData>) {
data.sessions.sessions_keep_alive();
for connection in data.connections.active_connections() {
let last_message_diff = time::Instant::now() - connection.last_message_time();
if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) {
Expand Down Expand Up @@ -460,7 +461,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.generation_sessions.get(&session_id)
data.sessions.generation_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -538,7 +539,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.encryption_sessions.get(&session_id)
data.sessions.encryption_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -629,7 +630,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.decryption_sessions.get(&decryption_session_id)
data.sessions.decryption_sessions.get(&decryption_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -705,7 +706,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.signing_sessions.get(&signing_session_id)
data.sessions.signing_sessions.get(&signing_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -784,7 +785,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -865,7 +866,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -946,7 +947,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -1029,7 +1030,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -1084,8 +1085,12 @@ impl ClusterCore {
/// Process single cluster message from the connection.
fn process_cluster_message(data: Arc<ClusterData>, connection: Arc<Connection>, message: ClusterMessage) {
match message {
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))),
ClusterMessage::KeepAliveResponse(_) => (),
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: None,
})))),
ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id {
data.sessions.on_session_keep_alive(connection.node_id(), session_id.into());
},
_ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()),
}
}
Expand Down Expand Up @@ -1459,7 +1464,7 @@ impl ClusterClient for ClusterClientImpl {

#[cfg(test)]
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSessionImpl>> {
self.data.sessions.generation_sessions.get(session_id)
self.data.sessions.generation_sessions.get(session_id, false)
}
}

Expand Down
58 changes: 53 additions & 5 deletions secret_store/src/key_server_cluster/cluster_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const SESSION_TIMEOUT_INTERVAL: u64 = 60;
/// Interval to send session-level KeepAlive-messages.
const SESSION_KEEP_ALIVE_INTERVAL: u64 = 30;

lazy_static! {
/// Servers set change session id (there could be at most 1 session => hardcoded id).
Expand Down Expand Up @@ -129,6 +131,8 @@ pub struct QueuedSession<V, M> {
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<Cluster>,
/// Last keep alive time.
pub last_keep_alive_time: time::Instant,
/// Last received message time.
pub last_message_time: time::Instant,
/// Generation session.
Expand Down Expand Up @@ -224,6 +228,18 @@ impl ClusterSessions {
self.make_faulty_generation_sessions.store(true, Ordering::Relaxed);
}

/// Send session-level keep-alive messages.
pub fn sessions_keep_alive(&self) {
self.admin_sessions.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id);
}

/// When session-level keep-alive response is received.
pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) {
if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
self.admin_sessions.on_keep_alive(&session_id, sender);
}
}

/// Create new generation session.
pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<GenerationSessionImpl>, Error> {
// check that there's no finished encryption session with the same id
Expand Down Expand Up @@ -514,9 +530,6 @@ impl ClusterSessions {
self.encryption_sessions.stop_stalled_sessions();
self.decryption_sessions.stop_stalled_sessions();
self.signing_sessions.stop_stalled_sessions();
// TODO: servers set change session could take a lot of time
// && during that session some nodes could not receive messages
// => they could stop session as stalled. This must be handled
self.admin_sessions.stop_stalled_sessions();
}

Expand Down Expand Up @@ -571,8 +584,15 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
self.sessions.read().is_empty()
}

pub fn get(&self, session_id: &K) -> Option<Arc<V>> {
self.sessions.read().get(session_id).map(|s| s.session.clone())
pub fn get(&self, session_id: &K, update_last_message_time: bool) -> Option<Arc<V>> {
let mut sessions = self.sessions.write();
sessions.get_mut(session_id)
.map(|s| {
if update_last_message_time {
s.last_message_time = time::Instant::now();
}
s.session.clone()
})
}

pub fn insert<F: FnOnce() -> Result<V, Error>>(&self, master: NodeId, session_id: K, cluster: Arc<Cluster>, is_exclusive_session: bool, session: F) -> Result<Arc<V>, Error> {
Expand All @@ -590,6 +610,7 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
let queued_session = QueuedSession {
master: master,
cluster_view: cluster,
last_keep_alive_time: time::Instant::now(),
last_message_time: time::Instant::now(),
session: session.clone(),
queue: VecDeque::new(),
Expand Down Expand Up @@ -649,6 +670,33 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
}
}

impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: ClusterSession, SessionId: From<K> {
pub fn send_keep_alive(&self, session_id: &K, self_node_id: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
if self_node_id == &session.master && now - session.last_keep_alive_time > time::Duration::from_secs(SESSION_KEEP_ALIVE_INTERVAL) {
session.last_keep_alive_time = now;
// since we send KeepAlive message to prevent nodes from disconnecting
// && worst thing that can happen if node is disconnected is that session is failed
// => ignore error here, because probably this node is not need for the rest of the session at all
let _ = session.cluster_view.broadcast(Message::Cluster(message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: Some(session_id.clone().into()),
})));
}
}
}

pub fn on_keep_alive(&self, session_id: &K, sender: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
// we only accept keep alive from master node of ServersSetChange session
if sender == &session.master {
session.last_keep_alive_time = now;
}
}
}
}

impl ClusterSessionsContainerState {
/// When session is starting.
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {
Expand Down
2 changes: 2 additions & 0 deletions secret_store/src/key_server_cluster/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ pub struct KeepAlive {
/// Confirm that the node is still alive.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeepAliveResponse {
/// Session id, if used for session-level keep alive.
pub session_id: Option<MessageSessionId>,
}

/// Initialize new DKG session.
Expand Down

0 comments on commit 4e9d439

Please sign in to comment.