From b5dc13f69ea8580fa5afb6c9fdcd8e4dc2955bfe Mon Sep 17 00:00:00 2001 From: Pavel Perestoronin Date: Thu, 21 Nov 2024 00:30:35 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A5=85=20Do=20not=20report=20failures=20t?= =?UTF-8?q?o=20Better=20Stack=20to=20avoid=20flakiness?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cli.rs | 4 +-- src/client.rs | 6 ---- src/heartbeat.rs | 69 ++++++------------------------------------ src/main.rs | 4 +-- src/marktplaats/bot.rs | 11 +++---- src/telegram.rs | 10 ++---- 6 files changed, 21 insertions(+), 83 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index eb4727b..119e2ed 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -43,7 +43,7 @@ pub struct MarktplaatsArgs { )] pub search_limit: u32, - /// Better Stack heartbeat URL for the Marktplaats crawler. + /// Heartbeat URL for the Marktplaats crawler. #[clap( long = "marktplaats-heartbeat-url", env = "MARKTPLAATS_HEARTBEAT_URL", @@ -79,7 +79,7 @@ pub struct TelegramArgs { )] pub authorized_chat_ids: Vec, - /// Better Stack heartbeat URL for the Telegram bot. + /// Heartbeat URL for the Telegram bot. #[clap( long = "telegram-heartbeat-url", env = "TELEGRAM_HEARTBEAT_URL", diff --git a/src/client.rs b/src/client.rs index 50492ae..f7bee3c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,6 @@ use std::{any::type_name, time::Duration}; use clap::crate_version; use reqwest::{ - Body, IntoUrl, Method, header, @@ -55,11 +54,6 @@ impl RequestBuilder { self.0.try_clone().context("failed to clone the request builder").map(Self) } - /// Send a body. - pub fn body(self, body: impl Into) -> Self { - Self(self.0.body(body)) - } - /// Send a JSON body. pub fn json(self, json: &R) -> Self { Self(self.0.json(json)) diff --git a/src/heartbeat.rs b/src/heartbeat.rs index 3f81ee4..da8be3f 100644 --- a/src/heartbeat.rs +++ b/src/heartbeat.rs @@ -6,71 +6,22 @@ use crate::{client::Client, prelude::*}; pub struct Heartbeat<'a>(Option>); impl<'a> Heartbeat<'a> { - pub fn try_new(client: &'a Client, success_url: Option) -> Result { - success_url - .map(|success_url| HeartbeatInner::try_new(client, success_url)) - .transpose() - .map(Self) + pub fn new(client: &'a Client, url: Option) -> Self { + Self(url.map(|url| HeartbeatInner { client, url })) } - pub async fn report_success(&self) { - let Some(inner) = &self.0 else { - return; + pub async fn check_in(&self) { + if let Some(inner) = &self.0 { + if let Err(error) = + inner.client.request(Method::POST, inner.url.clone()).read_text(true).await + { + warn!("Failed to send the heartbeat: {error:#}"); + } }; - let success_url = inner.success_url.clone(); - if let Err(error) = inner.client.request(Method::POST, success_url).read_text(true).await { - warn!("Failed to send the heartbeat: {error:#}"); - } - } - - pub async fn report_failure(&self, error: &Error) { - let Some(inner) = &self.0 else { - return; - }; - let failure_url = inner.failure_url.clone(); - if let Err(error) = inner - .client - .request(Method::POST, failure_url) - .body(error.to_string()) - .read_text(true) - .await - { - warn!("Failed to report the failure: {error:#}"); - } } } struct HeartbeatInner<'a> { client: &'a Client, - success_url: Url, - failure_url: Url, -} - -impl<'a> HeartbeatInner<'a> { - fn try_new(client: &'a Client, success_url: Url) -> Result { - let mut failure_url = success_url.clone(); - failure_url - .path_segments_mut() - .map_err(|_| anyhow!("could not add a segment to `{success_url}`"))? - .push("fail"); - Ok(Self { client, success_url, failure_url }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn try_new_ok() -> Result { - let client = Client::try_new()?; - let url = Url::parse("https://uptime.betterstack.com/api/v1/heartbeat/XYZ1234")?; - let heartbeat = Heartbeat::try_new(&client, Some(url))?; - let inner = heartbeat.0.expect("inner should be `Some`"); - assert_eq!( - inner.failure_url, - Url::parse("https://uptime.betterstack.com/api/v1/heartbeat/XYZ1234/fail")?, - ); - Ok(()) - } + url: Url, } diff --git a/src/main.rs b/src/main.rs index 9242633..ef116ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn async_main(cli: Args) -> Result { let command_builder = telegram::bot::try_init(&telegram).await?; // Handle Telegram updates: - let telegram_heartbeat = Heartbeat::try_new(&client, cli.telegram.heartbeat_url)?; + let telegram_heartbeat = Heartbeat::new(&client, cli.telegram.heartbeat_url); let telegram_updates = telegram.clone().into_updates(0, cli.telegram.poll_timeout_secs, &telegram_heartbeat); let telegram_reactor = telegram::bot::Reactor::builder() @@ -66,7 +66,7 @@ async fn async_main(cli: Args) -> Result { .command_builder(&command_builder) .search_limit(cli.marktplaats.search_limit) .build(); - let marktplaats_heartbeat = Heartbeat::try_new(&client, cli.marktplaats.heartbeat_url)?; + let marktplaats_heartbeat = Heartbeat::new(&client, cli.marktplaats.heartbeat_url); let marktplaats_reactions = marktplaats_reactor.run(&marktplaats_heartbeat); // Now, merge all the reactions and execute them: diff --git a/src/marktplaats/bot.rs b/src/marktplaats/bot.rs index c7989f6..8a88f17 100644 --- a/src/marktplaats/bot.rs +++ b/src/marktplaats/bot.rs @@ -47,17 +47,14 @@ impl<'s> Reactor<'s> { .try_filter_map(|entry| async { if entry.is_none() { info!("No subscriptions found"); - heartbeat.report_success().await; + heartbeat.check_in().await; } Ok(entry) }) .and_then(move |(subscription, search_query)| async move { - let result = self.on_subscription(subscription, search_query).await; - match &result { - Ok(_) => heartbeat.report_success().await, - Err(error) => heartbeat.report_failure(error).await, - } - Ok(stream::iter(result?).map(Ok)) + let reactions = self.on_subscription(subscription, search_query).await?; + heartbeat.check_in().await; + Ok(stream::iter(reactions).map(Ok)) }) .try_flatten() } diff --git a/src/telegram.rs b/src/telegram.rs index 1c3f69e..9e8f2db 100644 --- a/src/telegram.rs +++ b/src/telegram.rs @@ -99,18 +99,14 @@ impl Telegram { heartbeat: &'a Heartbeat<'a>, ) -> impl Stream> + 'a { let advance = move |(this, offset)| async move { - let updates_result = GetUpdates::builder() + let updates = GetUpdates::builder() .offset(offset) .timeout_secs(poll_timeout_secs) .allowed_updates(&[AllowedUpdate::Message]) .build() .call_on(&this) - .await; - match &updates_result { - Ok(_) => heartbeat.report_success().await, - Err(error) => heartbeat.report_failure(error).await, - } - let updates = updates_result?; + .await?; + heartbeat.check_in().await; let next_offset = updates.last().map_or(offset, |last_update| last_update.id + 1); info!(n = updates.len(), next_offset, "Received Telegram updates"); Ok::<_, Error>(Some((stream::iter(updates).map(Ok), (this, next_offset))))