Skip to content

Commit

Permalink
fix: Improve unsynced node warnings (#28)
Browse files Browse the repository at this point in the history
* fix: Use backoff mechanism for tg alerts

* fix: Make thresholds env configurable

* refactor: Re-use previous throttle mechanism

* refactor: last_synced -> last_seen

* fix: Ensure timeout triggers opsgenie alert as before

* feat: Separate throttle period for warnings
  • Loading branch information
ckoopmann authored Aug 5, 2024
1 parent cf0d494 commit 2247565
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 76 deletions.
35 changes: 7 additions & 28 deletions src/phoenix/consensus_node.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tracing::{error, info, warn};
use tracing::{error, info};

use super::{
alerts::{
telegram::{TelegramAlerts, TelegramSafeAlert},
SendAlert,
},
env::APP_CONFIG,
PhoenixMonitor,
};
use super::{env::APP_CONFIG, PhoenixMonitor};
use crate::beacon_api::BeaconApi;

pub struct ConsensusNodeMonitor {
beacon_api: BeaconApi,
telegram_alerts: TelegramAlerts,
}

impl ConsensusNodeMonitor {
pub fn new() -> Self {
Self {
beacon_api: BeaconApi::new(&APP_CONFIG.consensus_nodes),
telegram_alerts: TelegramAlerts::new(),
}
}

pub async fn get_current_timestamp(&self) -> Result<DateTime<Utc>> {
async fn num_unsynced_nodes(&self) -> usize {
let mut results = Vec::new();

for url in &APP_CONFIG.consensus_nodes {
Expand All @@ -42,26 +32,15 @@ impl ConsensusNodeMonitor {
}

let synced: Vec<&bool> = results.iter().filter(|is_synced| **is_synced).collect();

info!("{}/{} consensus nodes synced", synced.len(), results.len());
let num_out_of_sync = results.len() - synced.len();

if num_out_of_sync > 1 {
Err(anyhow!("multiple consensus nodes out of sync"))
} else {
if num_out_of_sync == 1 {
warn!("one consensus node is out of sync");
let message = TelegramSafeAlert::new("one consensus node is out of sync");
self.telegram_alerts.send_warning(message).await;
}
Ok(Utc::now())
}
results.len() - synced.len()
}
}

#[async_trait]
impl PhoenixMonitor for ConsensusNodeMonitor {
async fn refresh(&self) -> Result<DateTime<Utc>> {
ConsensusNodeMonitor::get_current_timestamp(self).await
async fn refresh(&self) -> (DateTime<Utc>, usize) {
let num_unsynced_nodes = self.num_unsynced_nodes().await;
(Utc::now(), num_unsynced_nodes)
}
}
12 changes: 12 additions & 0 deletions src/phoenix/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ pub struct AppConfig {
pub telegram_warnings_channel_id: String,
#[serde(deserialize_with = "deserialize_urls")]
pub validation_nodes: Vec<Url>,
#[serde(default = "default_unsynced_nodes_threshold_tg_warning")]
pub unsynced_nodes_threshold_tg_warning: usize,
#[serde(default = "default_unsynced_nodes_threshold_og_alert")]
pub unsynced_nodes_threshold_og_alert: usize,
}

fn default_unsynced_nodes_threshold_og_alert() -> usize {
2
}

fn default_unsynced_nodes_threshold_tg_warning() -> usize {
1
}

fn default_wait() -> i64 {
Expand Down
72 changes: 51 additions & 21 deletions src/phoenix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,82 @@ use self::{
lazy_static! {
static ref PHOENIX_MAX_LIFESPAN: Duration = Duration::minutes(3);
static ref MIN_ALARM_WAIT: Duration = Duration::minutes(4);
static ref MIN_WARNING_WAIT: Duration = Duration::minutes(60);
}

struct Alarm {
last_fired: Option<DateTime<Utc>>,
telegram_alerts: TelegramAlerts,
}

enum AlarmType {
Telegram,
Opsgenie,
}

impl Alarm {
fn new() -> Self {
Self { last_fired: None }
Self {
last_fired: None,
telegram_alerts: TelegramAlerts::new(),
}
}

fn is_throttled(&self) -> bool {
fn is_throttled(&self, alarm_type: &AlarmType) -> bool {
self.last_fired.map_or(false, |last_fired| {
Utc::now() - last_fired < *MIN_ALARM_WAIT
let min_wait = match alarm_type {
AlarmType::Opsgenie => *MIN_ALARM_WAIT,
AlarmType::Telegram => *MIN_WARNING_WAIT,
};
Utc::now() - last_fired < min_wait
})
}

async fn fire(&mut self, message: &str) {
if self.is_throttled() {
async fn fire(&mut self, message: &str, alarm_type: &AlarmType) {
if self.is_throttled(alarm_type) {
warn!("alarm is throttled, ignoring request to fire alarm");
return;
}

error!(message, "firing alarm");

alerts::send_opsgenie_telegram_alert(message).await;
match alarm_type {
AlarmType::Opsgenie => alerts::send_opsgenie_telegram_alert(message).await,
AlarmType::Telegram => {
self.telegram_alerts
.send_warning(TelegramSafeAlert::new(message))
.await
}
}

self.last_fired = Some(Utc::now());
}

async fn fire_with_name(&mut self, name: &str) {
async fn fire_age_over_limit(&mut self, name: &str) {
let message = format!(
"{} hasn't updated for more than {} seconds",
name,
PHOENIX_MAX_LIFESPAN.num_seconds(),
);
self.fire(&message, &AlarmType::Opsgenie).await;
}

async fn fire_num_unsynced_nodes(&mut self, name: &str, num_unsynced_nodes: usize) {
let message = format!("{} has {} unsynced instances", name, num_unsynced_nodes);

self.fire(&message).await
if num_unsynced_nodes >= APP_CONFIG.unsynced_nodes_threshold_og_alert {
self.fire(&message, &AlarmType::Opsgenie).await;
}
if num_unsynced_nodes >= APP_CONFIG.unsynced_nodes_threshold_tg_warning {
self.fire(&message, &AlarmType::Telegram).await;
}
}
}

struct Phoenix {
name: &'static str,
last_seen: DateTime<Utc>,
num_unsynced_nodes: usize,
monitor: Box<dyn PhoenixMonitor + Send + Sync>,
}

Expand All @@ -109,7 +141,7 @@ impl Phoenix {

#[async_trait]
trait PhoenixMonitor {
async fn refresh(&self) -> Result<DateTime<Utc>>;
async fn refresh(&self) -> (DateTime<Utc>, usize);
}

async fn run_alarm_loop(last_checked: Arc<Mutex<DateTime<Utc>>>) -> Result<()> {
Expand All @@ -124,10 +156,12 @@ async fn run_alarm_loop(last_checked: Arc<Mutex<DateTime<Utc>>>) -> Result<()> {
Phoenix {
last_seen: Utc::now(),
monitor: Box::new(ConsensusNodeMonitor::new()),
num_unsynced_nodes: 0,
name: "consensus node",
},
Phoenix {
last_seen: Utc::now(),
num_unsynced_nodes: 0,
monitor: Box::new(ValidationNodeMonitor::new()),
name: "validation node",
},
Expand All @@ -136,20 +170,16 @@ async fn run_alarm_loop(last_checked: Arc<Mutex<DateTime<Utc>>>) -> Result<()> {
loop {
for phoenix in phoenixes.iter_mut() {
if phoenix.is_age_over_limit() {
alarm.fire_with_name(phoenix.name).await;
alarm.fire_age_over_limit(phoenix.name).await;
} else {
alarm
.fire_num_unsynced_nodes(phoenix.name, phoenix.num_unsynced_nodes)
.await;
}

let current = phoenix.monitor.refresh().await;
match current {
Ok(current) => phoenix.set_last_seen(current),
Err(err) => {
error!(
name = phoenix.name,
?err,
"failed to refresh phoenix monitor"
);
}
}
let (current, num_unsynced_nodes) = phoenix.monitor.refresh().await;
phoenix.num_unsynced_nodes = num_unsynced_nodes;
phoenix.set_last_seen(current)
}

// Update the last checked time.
Expand Down
34 changes: 7 additions & 27 deletions src/phoenix/validation_node.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde_json::json;
use tracing::{error, info, warn};
use tracing::{error, info};

use super::{
alerts::{
telegram::{TelegramAlerts, TelegramSafeAlert},
SendAlert,
},
env::APP_CONFIG,
PhoenixMonitor,
};
use super::{env::APP_CONFIG, PhoenixMonitor};

#[derive(Deserialize)]
struct SyncResponse {
Expand All @@ -32,18 +24,16 @@ async fn get_sync_status(client: &reqwest::Client, url: String) -> reqwest::Resu

pub struct ValidationNodeMonitor {
client: reqwest::Client,
telegram_alerts: TelegramAlerts,
}

impl ValidationNodeMonitor {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
telegram_alerts: TelegramAlerts::new(),
}
}

pub async fn get_current_timestamp(&self) -> Result<DateTime<Utc>> {
pub async fn num_unsynced_nodes(&self) -> usize {
let mut results = Vec::new();

for url in &APP_CONFIG.validation_nodes {
Expand All @@ -61,24 +51,14 @@ impl ValidationNodeMonitor {
let synced: Vec<&bool> = results.iter().filter(|is_synced| **is_synced).collect();

info!("{}/{} validation nodes synced", synced.len(), results.len());
let num_out_of_sync = results.len() - synced.len();

if num_out_of_sync > 1 {
Err(anyhow!("multiple validation nodes out of sync"))
} else {
if num_out_of_sync == 1 {
warn!("one validation node is out of sync");
let message = TelegramSafeAlert::new("one validation node is out of sync");
self.telegram_alerts.send_warning(message).await;
}
Ok(Utc::now())
}
results.len() - synced.len()
}
}

#[async_trait]
impl PhoenixMonitor for ValidationNodeMonitor {
async fn refresh(&self) -> Result<DateTime<Utc>> {
ValidationNodeMonitor::get_current_timestamp(self).await
async fn refresh(&self) -> (DateTime<Utc>, usize) {
let num_unsynced_nodes = self.num_unsynced_nodes().await;
(Utc::now(), num_unsynced_nodes)
}
}

0 comments on commit 2247565

Please sign in to comment.