diff --git a/Cargo.lock b/Cargo.lock index 784ca01..b7ee568 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,7 +987,7 @@ checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" [[package]] name = "primitives" version = "0.1.0" -source = "git+https://github.com/AdExNetwork/adex-validator-stack-rust?branch=clean-up-marketing-campaign#e568c8447f84681174567b2392e477462eb3c96b" +source = "git+https://github.com/AdExNetwork/adex-validator-stack-rust?branch=dev#74618d47a4b170a128732dc6b92f3eedcf2bf02d" dependencies = [ "chrono", "fake", diff --git a/Cargo.toml b/Cargo.toml index 78f7e5f..5e7a593 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Lachezar Lechev "] edition = "2018" [dependencies] -primitives = { git = "https://github.com/AdExNetwork/adex-validator-stack-rust", branch = "clean-up-marketing-campaign" } +primitives = { git = "https://github.com/AdExNetwork/adex-validator-stack-rust", branch = "dev" } chrono = { version = "0.4" } # Server tokio = { version = "0.2", features = ["macros", "rt-threaded", "sync"] } diff --git a/src/status.rs b/src/status.rs index 2dd8517..92733f9 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,9 +1,9 @@ use crate::sentry_api::SentryApi; use chrono::{DateTime, Duration, Utc}; use primitives::{ - sentry::{HeartbeatValidatorMessage, LastApproved, LastApprovedResponse}, - validator::{Heartbeat, MessageTypes}, - BalancesMap, BigNum, Channel, + sentry::{HeartbeatValidatorMessage, LastApprovedResponse}, + validator::MessageTypes, + BalancesMap, BigNum, Channel, ValidatorId, }; use reqwest::Error; @@ -31,9 +31,100 @@ pub enum Finalized { } struct Messages { - last_approved: Option, - leader_heartbeats: Vec, - follower_heartbeats: Vec, + leader: LastApprovedResponse, + follower: LastApprovedResponse, + recency: Duration, +} + +impl Messages { + fn has_leader_hb(&self) -> bool { + self.leader + .heartbeats + .as_ref() + .map(|heartbeats| heartbeats.len() > 0) + .unwrap_or(false) + } + + fn has_follower_hb(&self) -> bool { + self.follower + .heartbeats + .as_ref() + .map(|heartbeats| heartbeats.len() > 0) + .unwrap_or(false) + } + + fn has_follower_approve_state(&self) -> bool { + self.follower + .last_approved + .as_ref() + .map(|last_approved| last_approved.approve_state.is_some()) + .unwrap_or(false) + } + + fn has_recent_follower_hb(&self) -> bool { + self.follower + .heartbeats + .as_ref() + .map(|heartbeats| self.has_recent_heartbeat_from(&heartbeats, None)) + .unwrap_or(false) + } + + fn has_recent_leader_hb_from(&self, validator: &ValidatorId) -> bool { + self.leader + .heartbeats + .as_ref() + .map(|heartbeats| self.has_recent_heartbeat_from(heartbeats, Some(validator))) + .unwrap_or(false) + } + + + fn has_recent_follower_hb_from(&self, validator: &ValidatorId) -> bool { + self.follower + .heartbeats + .as_ref() + .map(|heartbeats| self.has_recent_heartbeat_from(heartbeats, Some(validator))) + .unwrap_or(false) + } + + fn has_recent_leader_hb(&self) -> bool { + self.leader + .heartbeats + .as_ref() + .map(|heartbeats| self.has_recent_heartbeat_from(&heartbeats, None)) + .unwrap_or(false) + } + + fn has_leader_new_state(&self) -> bool { + self.leader + .last_approved + .as_ref() + .map(|last_approved| last_approved.new_state.is_some()) + .unwrap_or(false) + } + + /// `from`: If `None` it will just check for a recent Heartbeat + fn has_recent_heartbeat_from( + &self, + heartbeats: &[HeartbeatValidatorMessage], + from: Option<&ValidatorId>, + ) -> bool { + heartbeats + .iter() + .any(|heartbeat_msg| match (from, &heartbeat_msg.msg) { + (Some(from), MessageTypes::Heartbeat(heartbeat)) + if &heartbeat_msg.from == from + && is_date_recent(self.recency, &heartbeat.timestamp) => + { + true + } + (None, MessageTypes::Heartbeat(heartbeat)) + if is_date_recent(self.recency, &heartbeat.timestamp) => + { + true + } + _ => false, + }) + } } #[derive(Debug, PartialEq, Eq)] @@ -106,7 +197,9 @@ pub async fn is_finalized(sentry: &SentryApi, channel: &Channel) -> Result Result { @@ -121,21 +214,21 @@ pub async fn get_status(sentry: &SentryApi, channel: &Channel) -> Result Result>) -> Vec { - match heartbeats { - Some(heartbeats) => heartbeats - .into_iter() - .filter_map(|heartbeat| match heartbeat.msg { - MessageTypes::Heartbeat(heartbeat) => Some(heartbeat), - _ => None, - }) - .collect(), - None => Default::default(), - } +/// there are no messages at all for at least one validator +fn is_initializing(messages: &Messages) -> bool { + (!messages.has_leader_hb() && !messages.has_leader_new_state()) + || (!messages.has_follower_hb() && !messages.has_follower_approve_state()) } -fn is_initializing() -> bool { - todo!() +/// at least one validator doesn't have a recent Heartbeat message +fn is_offline(messages: &Messages) -> bool { + !messages.has_recent_leader_hb() || !messages.has_recent_follower_hb() } -// at least one validator doesn't have a recent Heartbeat message -fn is_offline(leader: &[Heartbeat], follower: &[Heartbeat]) -> bool { - // @TODO: Move to configuration - let recency = Duration::minutes(4); - - !leader - .iter() - .any(|h| is_date_recent(&recency, &h.timestamp)) - || !follower - .iter() - .any(|h| is_date_recent(&recency, &h.timestamp)) +fn is_date_recent(recency: Duration, date: &DateTime) -> bool { + date >= &(Utc::now() - recency) } -fn is_date_recent(recency: &Duration, date: &DateTime) -> bool { - todo!() -} +/// validators have recent Heartbeat messages, but they don't seem to be propagating messages between one another (the majority of Heartbeats are not found on both validators) +fn is_disconnected(channel: &Channel, messages: &Messages) -> bool { + let leader = &channel.spec.validators.leader().id; + let follower = &channel.spec.validators.follower().id; -fn is_disconnected() -> bool { - todo!() + !(messages.has_recent_leader_hb_from(follower) && messages.has_recent_follower_hb_from(leader)) } fn is_rejected_state() -> bool { @@ -236,8 +314,12 @@ fn is_ready() -> bool { mod test { use super::*; use httptest::{mappers::*, responders::*, Expectation, Server, ServerPool}; - use primitives::util::tests::prep_db::{ - DUMMY_CHANNEL, DUMMY_VALIDATOR_FOLLOWER, DUMMY_VALIDATOR_LEADER, + use primitives::{ + util::tests::prep_db::{ + DUMMY_CHANNEL, DUMMY_VALIDATOR_FOLLOWER, DUMMY_VALIDATOR_LEADER, + }, + sentry::{ApproveStateValidatorMessage, NewStateValidatorMessage, LastApproved}, + validator::{ApproveState, NewState, Heartbeat} }; static SERVER_POOL: ServerPool = ServerPool::new(4); @@ -255,6 +337,42 @@ mod test { channel } + fn get_approve_state_msg() -> ApproveStateValidatorMessage { + ApproveStateValidatorMessage { + from: DUMMY_VALIDATOR_LEADER.id, + received: Utc::now(), + msg: MessageTypes::ApproveState(ApproveState { + state_root: String::from("0x0"), + signature: String::from("0x0"), + is_healthy: true, + }), + } + } + + fn get_heartbeat_msg(recency: Duration, validator_id: ValidatorId) -> HeartbeatValidatorMessage { + HeartbeatValidatorMessage { + from: validator_id, + received: Utc::now() - recency, + msg: MessageTypes::Heartbeat(Heartbeat { + signature: String::from("0x0"), + state_root: String::from("0x0"), + timestamp: Utc::now() - recency, + }), + } + } + + fn get_new_state_msg() -> NewStateValidatorMessage { + NewStateValidatorMessage { + from: DUMMY_VALIDATOR_LEADER.id, + received: Utc::now(), + msg: MessageTypes::NewState(NewState { + signature: String::from("0x0"), + state_root: String::from("0x0"), + balances: Default::default(), + }), + } + } + #[tokio::test] async fn test_is_finalized_when_expired() { let server = SERVER_POOL.get_server(); @@ -309,9 +427,345 @@ mod test { #[test] fn is_offline_no_heartbeats() { + let messages = Messages { + leader: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + follower: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + recency: Duration::minutes(4), + }; + + assert!( + is_offline(&messages), + "On empty heartbeat it should be offline!" + ); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: None, + heartbeats: None, + }, + follower: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + recency: Duration::minutes(4), + }; + + assert!( + is_offline(&messages), + "On empty heartbeat it should be offline!" + ); + } + + // is_date_recent() + #[test] + fn now_date_is_recent() { + let now = Utc::now(); + let recency = Duration::minutes(4); + assert!( + is_date_recent(recency, &now), + "The present moment is a recent date!" + ) + } + + #[test] + fn slightly_past_is_recent() { + let recency = Duration::minutes(4); + let on_the_edge = Utc::now().checked_sub_signed(Duration::minutes(3)).unwrap(); assert!( - is_offline(&[], &[]), - "On empty heartbeast it should be offline!" + is_date_recent(recency, &on_the_edge), + "When date is just as old as the recency limit, it still counts as recent" + ) + } + + #[test] + fn old_date_is_not_recent() { + let recency = Duration::minutes(4); + let past = Utc::now() - Duration::minutes(10); + assert_eq!( + is_date_recent(recency, &past), + false, + "Date older than the recency limit is not recent" + ) + } + + // is_initializing() + #[test] + fn two_empty_message_arrays() { + let messages = Messages { + leader: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + follower: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_initializing(&messages), + true, + "Both leader heartbeats + newstate and follower heatbeats + approvestate pairs are empty arrays" + ) + } + + #[test] + fn leader_has_no_messages() { + let channel = DUMMY_CHANNEL.clone(); + let approve_state = get_approve_state_msg(); + let heartbeat = get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id); + let messages = Messages { + leader: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(vec![heartbeat]), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_initializing(&messages), + true, + "Leader has no new messages but the follower has heartbeats and approvestate" + ) + } + + #[test] + fn follower_has_no_messages() { + let channel = DUMMY_CHANNEL.clone(); + let heartbeat = get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id); + let new_state = get_new_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(vec![heartbeat]), + }, + follower: LastApprovedResponse { + last_approved: None, + heartbeats: Some(vec![]), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_initializing(&messages), + true, + "Follower has no new messages but leader has heartbeats and newstate" + ) + } + + #[test] + fn both_arrays_have_messages() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id)]; + let new_state = get_new_state_msg(); + let approve_state = get_approve_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_initializing(&messages), + false, + "Both arrays have messages" + ) + } + + // is_disconnected() + #[test] + fn no_recent_hbs_on_both_sides() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(10), channel.spec.validators.leader().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(10), channel.spec.validators.leader().id)]; + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: None, + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_disconnected(&channel, &messages), + true, + "Both leader and follower heartbeats have no recent messages" + ) + } + + #[test] + fn no_recent_follower_hbs() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(1), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(1), channel.spec.validators.follower().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(10), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(10), channel.spec.validators.follower().id)]; + let new_state = get_new_state_msg(); + let approve_state = get_approve_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_disconnected(&channel, &messages), + true, + "No recent heartbeats on both validators" + ) + } + + #[test] + fn no_hb_in_leader_where_from_points_to_follower() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.follower().id)]; + let new_state = get_new_state_msg(); + let approve_state = get_approve_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_disconnected(&channel, &messages), + true, + "Leader validator heartbeats have no recent messages that came from the follower" + ) + } + + #[test] + fn no_hb_in_follower_where_from_points_to_leader() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.follower().id)]; + + let new_state = get_new_state_msg(); + let approve_state = get_approve_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_disconnected(&channel, &messages), + true, + "Follower validator heartbeats have no recent messages that came from the leader" + ) + } + + #[test] + fn recent_hbs_coming_from_both_validators() { + let channel = DUMMY_CHANNEL.clone(); + let leader_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.follower().id)]; + let follower_heartbeats = vec![get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.leader().id), get_heartbeat_msg(Duration::minutes(0), channel.spec.validators.follower().id)]; + let new_state = get_new_state_msg(); + let approve_state = get_approve_state_msg(); + + let messages = Messages { + leader: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: Some(new_state), + approve_state: None, + }), + heartbeats: Some(leader_heartbeats), + }, + follower: LastApprovedResponse { + last_approved: Some(LastApproved { + new_state: None, + approve_state: Some(approve_state), + }), + heartbeats: Some(follower_heartbeats), + }, + recency: Duration::minutes(4), + }; + + assert_eq!( + is_disconnected(&channel, &messages), + false, + "Leader hb has recent messages that came from the follower, and the follower has recent messages that came from the leader" ) }