Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

SecretStore: session level timeout #6631

Merged
merged 3 commits into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions secret_store/src/key_server_cluster/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl ClusterCore {

/// Send keepalive messages to every othe node.
fn keep_alive(data: Arc<ClusterData>) {
data.sessions.sessions_keep_alive();
for connection in data.connections.active_connections() {
let last_message_diff = time::Instant::now() - connection.last_message_time();
if last_message_diff > time::Duration::from_secs(KEEP_ALIVE_DISCONNECT_INTERVAL) {
Expand Down Expand Up @@ -460,7 +461,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.generation_sessions.get(&session_id)
data.sessions.generation_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -538,7 +539,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.encryption_sessions.get(&session_id)
data.sessions.encryption_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -629,7 +630,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.decryption_sessions.get(&decryption_session_id)
data.sessions.decryption_sessions.get(&decryption_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -705,7 +706,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.signing_sessions.get(&signing_session_id)
data.sessions.signing_sessions.get(&signing_session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -784,7 +785,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -865,7 +866,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -946,7 +947,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -1027,7 +1028,7 @@ impl ClusterCore {
}
},
_ => {
data.sessions.admin_sessions.get(&session_id)
data.sessions.admin_sessions.get(&session_id, true)
.ok_or(Error::InvalidSessionId)
},
};
Expand Down Expand Up @@ -1082,8 +1083,12 @@ impl ClusterCore {
/// Process single cluster message from the connection.
fn process_cluster_message(data: Arc<ClusterData>, connection: Arc<Connection>, message: ClusterMessage) {
match message {
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {})))),
ClusterMessage::KeepAliveResponse(_) => (),
ClusterMessage::KeepAlive(_) => data.spawn(connection.send_message(Message::Cluster(ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: None,
})))),
ClusterMessage::KeepAliveResponse(msg) => if let Some(session_id) = msg.session_id {
data.sessions.on_session_keep_alive(connection.node_id(), session_id.into());
},
_ => warn!(target: "secretstore_net", "{}: received unexpected message {} from node {} at {}", data.self_key_pair.public(), message, connection.node_id(), connection.node_address()),
}
}
Expand Down Expand Up @@ -1451,7 +1456,7 @@ impl ClusterClient for ClusterClientImpl {

#[cfg(test)]
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSessionImpl>> {
self.data.sessions.generation_sessions.get(session_id)
self.data.sessions.generation_sessions.get(session_id, false)
}
}

Expand Down
58 changes: 53 additions & 5 deletions secret_store/src/key_server_cluster/cluster_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use key_server_cluster::admin_sessions::ShareChangeSessionMeta;
/// This timeout is for cases when node is responding to KeepAlive messages, but intentionally ignores
/// session messages.
const SESSION_TIMEOUT_INTERVAL: u64 = 60;
/// Interval to send session-level KeepAlive-messages.
const SESSION_KEEP_ALIVE_INTERVAL: u64 = 30;

lazy_static! {
/// Servers set change session id (there could be at most 1 session => hardcoded id).
Expand Down Expand Up @@ -129,6 +131,8 @@ pub struct QueuedSession<V, M> {
pub master: NodeId,
/// Cluster view.
pub cluster_view: Arc<Cluster>,
/// Last keep alive time.
pub last_keep_alive_time: time::Instant,
/// Last received message time.
pub last_message_time: time::Instant,
/// Generation session.
Expand Down Expand Up @@ -224,6 +228,18 @@ impl ClusterSessions {
self.make_faulty_generation_sessions.store(true, Ordering::Relaxed);
}

/// Send session-level keep-alive messages.
pub fn sessions_keep_alive(&self) {
self.admin_sessions.send_keep_alive(&*SERVERS_SET_CHANGE_SESSION_ID, &self.self_node_id);
}

/// When session-level keep-alive response is received.
pub fn on_session_keep_alive(&self, sender: &NodeId, session_id: SessionId) {
if session_id == *SERVERS_SET_CHANGE_SESSION_ID {
self.admin_sessions.on_keep_alive(&session_id, sender);
}
}

/// Create new generation session.
pub fn new_generation_session(&self, master: NodeId, session_id: SessionId, nonce: Option<u64>, cluster: Arc<Cluster>) -> Result<Arc<GenerationSessionImpl>, Error> {
// check that there's no finished encryption session with the same id
Expand Down Expand Up @@ -507,9 +523,6 @@ impl ClusterSessions {
self.encryption_sessions.stop_stalled_sessions();
self.decryption_sessions.stop_stalled_sessions();
self.signing_sessions.stop_stalled_sessions();
// TODO: servers set change session could take a lot of time
// && during that session some nodes could not receive messages
// => they could stop session as stalled. This must be handled
self.admin_sessions.stop_stalled_sessions();
}

Expand Down Expand Up @@ -564,8 +577,15 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
self.sessions.read().is_empty()
}

pub fn get(&self, session_id: &K) -> Option<Arc<V>> {
self.sessions.read().get(session_id).map(|s| s.session.clone())
pub fn get(&self, session_id: &K, update_last_message_time: bool) -> Option<Arc<V>> {
let mut sessions = self.sessions.write();
sessions.get_mut(session_id)
.map(|s| {
if update_last_message_time {
s.last_message_time = time::Instant::now();
}
s.session.clone()
})
}

pub fn insert<F: FnOnce() -> Result<V, Error>>(&self, master: NodeId, session_id: K, cluster: Arc<Cluster>, is_exclusive_session: bool, session: F) -> Result<Arc<V>, Error> {
Expand All @@ -583,6 +603,7 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
let queued_session = QueuedSession {
master: master,
cluster_view: cluster,
last_keep_alive_time: time::Instant::now(),
last_message_time: time::Instant::now(),
session: session.clone(),
queue: VecDeque::new(),
Expand Down Expand Up @@ -642,6 +663,33 @@ impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: Cluster
}
}

impl<K, V, M> ClusterSessionsContainer<K, V, M> where K: Clone + Ord, V: ClusterSession, SessionId: From<K> {
pub fn send_keep_alive(&self, session_id: &K, self_node_id: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
if self_node_id == &session.master && now - session.last_keep_alive_time > time::Duration::from_secs(SESSION_KEEP_ALIVE_INTERVAL) {
session.last_keep_alive_time = now;
// since we send KeepAlive message to prevent nodes from disconnecting
// && worst thing that can happen if node is disconnected is that session is failed
// => ignore error here, because probably this node is not need for the rest of the session at all
let _ = session.cluster_view.broadcast(Message::Cluster(message::ClusterMessage::KeepAliveResponse(message::KeepAliveResponse {
session_id: Some(session_id.clone().into()),
})));
}
}
}

pub fn on_keep_alive(&self, session_id: &K, sender: &NodeId) {
if let Some(session) = self.sessions.write().get_mut(session_id) {
let now = time::Instant::now();
// we only accept keep alive from master node of ServersSetChange session
if sender == &session.master {
session.last_keep_alive_time = now;
}
}
}
}

impl ClusterSessionsContainerState {
/// When session is starting.
pub fn on_session_starting(&mut self, is_exclusive_session: bool) -> Result<(), Error> {
Expand Down
2 changes: 2 additions & 0 deletions secret_store/src/key_server_cluster/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ pub struct KeepAlive {
/// Confirm that the node is still alive.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KeepAliveResponse {
/// Session id, if used for session-level keep alive.
pub session_id: Option<MessageSessionId>,
}

/// Initialize new DKG session.
Expand Down