From 30f350fc5fafb987eb533bd787486ed01cef0b02 Mon Sep 17 00:00:00 2001 From: Alexander Lyon Date: Tue, 21 May 2024 13:57:44 +0200 Subject: [PATCH] add two retry strategies to allow requests to timeout gracefully (#8080) Certain APIs should time out in different ways. For the cache, we want to retry connection attempts but not retry upload timeouts as a retry is unlikely to change your connection speed. --- Cargo.lock | 1 + crates/turborepo-api-client/Cargo.toml | 1 + crates/turborepo-api-client/src/analytics.rs | 3 +- crates/turborepo-api-client/src/lib.rs | 64 ++++++--- crates/turborepo-api-client/src/retry.rs | 129 +++++++++++++++++-- crates/turborepo-api-client/src/spaces.rs | 14 +- crates/turborepo-api-client/src/telemetry.rs | 3 +- 7 files changed, 177 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 032c44d65f5129..905bfbec2558d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10939,6 +10939,7 @@ dependencies = [ "anyhow", "chrono", "http 0.2.11", + "httpmock", "lazy_static", "port_scanner", "regex", diff --git a/crates/turborepo-api-client/Cargo.toml b/crates/turborepo-api-client/Cargo.toml index 2e28f3a877d568..c900b52ae24118 100644 --- a/crates/turborepo-api-client/Cargo.toml +++ b/crates/turborepo-api-client/Cargo.toml @@ -11,6 +11,7 @@ rustls-tls = ["reqwest/rustls-tls-native-roots"] [dev-dependencies] http = "0.2.9" +httpmock = { workspace = true } port_scanner = { workspace = true } test-case = { workspace = true } turborepo-vercel-api-mock = { workspace = true } diff --git a/crates/turborepo-api-client/src/analytics.rs b/crates/turborepo-api-client/src/analytics.rs index d073b637c7f3e5..c847c266ee6ce0 100644 --- a/crates/turborepo-api-client/src/analytics.rs +++ b/crates/turborepo-api-client/src/analytics.rs @@ -25,8 +25,9 @@ impl AnalyticsClient for APIClient { .await? .json(&events); - retry::make_retryable_request(request_builder) + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) .await? + .into_response() .error_for_status()?; Ok(()) diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 72c2fed788e6d0..1a659ec471c0d1 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -1,5 +1,6 @@ #![feature(async_closure)] #![feature(error_generic_member_access)] +#![feature(assert_matches)] #![deny(clippy::all)] use std::{backtrace::Backtrace, env, future::Future, time::Duration}; @@ -134,9 +135,11 @@ impl Client for APIClient { .header("User-Agent", self.user_agent.clone()) .header("Authorization", format!("Bearer {}", token)) .header("Content-Type", "application/json"); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; Ok(response.json().await?) } @@ -149,9 +152,11 @@ impl Client for APIClient { .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; Ok(response.json().await?) } @@ -194,9 +199,11 @@ impl Client for APIClient { .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; Ok(response.json().await?) } @@ -208,9 +215,11 @@ impl Client for APIClient { .query(&[("token", token), ("tokenName", token_name)]) .header("User-Agent", self.user_agent.clone()); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; let verification_response: VerificationResponse = response.json().await?; @@ -310,7 +319,9 @@ impl CacheClient for APIClient { request_builder = Self::add_team_params(request_builder, team_id, team_slug); - let response = retry::make_retryable_request(request_builder).await?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?; + let response = response.into_response(); match response.status() { StatusCode::FORBIDDEN => Err(Self::handle_403(response).await), @@ -391,7 +402,10 @@ impl CacheClient for APIClient { request_builder = request_builder.header("x-artifact-tag", tag); } - let response = retry::make_retryable_request(request_builder).await?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Connection) + .await? + .into_response(); if response.status() == StatusCode::FORBIDDEN { return Err(Self::handle_403(response).await); @@ -416,9 +430,11 @@ impl CacheClient for APIClient { let request_builder = Self::add_team_params(request_builder, team_id, team_slug); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; Ok(response.json().await?) } @@ -451,7 +467,9 @@ impl TokenClient for APIClient { invalid_token: bool, } - let response = retry::make_retryable_request(request_builder).await?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?; + let response = response.into_response(); let status = response.status(); // Give a better error message for invalid tokens. This endpoint returns the // following statuses: @@ -503,7 +521,10 @@ impl TokenClient for APIClient { invalid_token: bool, } - let response = retry::make_retryable_request(request_builder).await?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response(); let status = response.status(); // Give a better error message for invalid tokens. This endpoint returns the // following statuses: @@ -604,7 +625,10 @@ impl APIClient { .header("Access-Control-Request-Headers", request_headers) .header("Authorization", format!("Bearer {}", token)); - let response = retry::make_retryable_request(request_builder).await?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response(); let headers = response.headers(); let location = if let Some(location) = headers.get("Location") { diff --git a/crates/turborepo-api-client/src/retry.rs b/crates/turborepo-api-client/src/retry.rs index ffe19bc063624c..8f027534d5782a 100644 --- a/crates/turborepo-api-client/src/retry.rs +++ b/crates/turborepo-api-client/src/retry.rs @@ -7,6 +7,30 @@ const MIN_SLEEP_TIME_SECS: u64 = 2; const MAX_SLEEP_TIME_SECS: u64 = 10; const RETRY_MAX: u32 = 2; +#[derive(Debug)] +pub enum Retry { + Once(Response), + #[allow(dead_code)] + Retried(Response, u32), +} + +impl Retry { + pub fn into_response(self) -> Response { + match self { + Retry::Once(response) => response, + Retry::Retried(response, _) => response, + } + } + + #[allow(dead_code)] + pub fn retry_count(&self) -> Option { + match self { + Retry::Once(_) => None, + Retry::Retried(_, count) => Some(*count), + } + } +} + /// Retries a request until `RETRY_MAX` is reached, the `should_retry_request` /// function returns false, or the future succeeds. Uses an exponential backoff /// with a base of 2 to delay between retries. @@ -15,11 +39,13 @@ const RETRY_MAX: u32 = 2; /// /// * `request_builder`: The request builder with everything, i.e. headers and /// body already set. NOTE: This must be cloneable, so no streams are allowed. +/// * `strategy`: The strategy to use for retrying requests. /// /// returns: Result pub(crate) async fn make_retryable_request( request_builder: RequestBuilder, -) -> Result { + strategy: RetryStrategy, +) -> Result { let mut last_error = None; for retry_count in 0..RETRY_MAX { // A request builder can fail to clone for two reasons: @@ -28,12 +54,12 @@ pub(crate) async fn make_retryable_request( // - the request body is a stream, in this case we'll just send the one request // we have let Some(builder) = request_builder.try_clone() else { - return Ok(request_builder.send().await?); + return Ok(Retry::Once(request_builder.send().await?)); }; match builder.send().await { - Ok(value) => return Ok(value), + Ok(value) => return Ok(Retry::Retried(value, retry_count)), Err(err) => { - if !should_retry_request(&err) { + if !strategy.should_retry(&err) { return Err(err.into()); } last_error = Some(err); @@ -49,16 +75,97 @@ pub(crate) async fn make_retryable_request( Err(Error::TooManyFailures(Box::new(last_error.unwrap()))) } -fn should_retry_request(error: &reqwest::Error) -> bool { - if let Some(status) = error.status() { - if status == StatusCode::TOO_MANY_REQUESTS { - return true; +/// A retry strategy. Note that error statuses and TOO_MANY_REQUESTS are always +/// retried. +pub enum RetryStrategy { + /// Retry in the case of connection issues, but ignore timeouts. + Connection, + /// Retry in the case of connection issues and timeouts. + Timeout, +} + +impl RetryStrategy { + fn should_retry(&self, error: &reqwest::Error) -> bool { + if let Some(status) = error.status() { + if status == StatusCode::TOO_MANY_REQUESTS { + return true; + } + + if status.as_u16() >= 500 && status.as_u16() != 501 { + return true; + } } - if status.as_u16() >= 500 && status.as_u16() != 501 { - return true; + match self { + RetryStrategy::Connection => error.is_connect(), + RetryStrategy::Timeout => error.is_timeout(), } } +} - error.is_request() || error.is_timeout() +#[cfg(test)] +mod test { + use std::{assert_matches::assert_matches, time::Duration}; + + use crate::{ + retry::{make_retryable_request, RetryStrategy}, + Error, + }; + + #[tokio::test] + async fn handles_too_many_failures() { + let mock = httpmock::MockServer::start_async().await; + let req = mock + .mock_async(|when, then| { + when.method(httpmock::Method::GET); + then.delay(Duration::from_secs(100)); + }) + .await; + + let request_builder = reqwest::Client::new() + .get(mock.url("/")) + .timeout(Duration::from_millis(10)); + let result = make_retryable_request(request_builder, RetryStrategy::Timeout).await; + + req.assert_hits_async(2).await; + assert_matches!(result, Err(Error::TooManyFailures(_))); + } + + #[tokio::test] + async fn handles_connection_timeout() { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_millis(10)) + .build() + .unwrap(); + + let request_builder = client.get("http://localhost:1").send().await; // bad port + let should_retry = RetryStrategy::Connection.should_retry(&request_builder.unwrap_err()); + + assert_matches!(should_retry, true); + } + + #[tokio::test] + async fn handles_connection_timeout_retries() { + let client = reqwest::Client::builder() + .timeout(Duration::from_millis(20)) + .connect_timeout(Duration::from_millis(10)) + .build() + .unwrap(); + + let mock = httpmock::MockServer::start_async().await; + let req = mock + .mock_async(|when, then| { + when.method(httpmock::Method::GET); + then.delay(Duration::from_secs(100)); + }) + .await; + + let request_builder = client.get(mock.url("/")); // bad port + let result = make_retryable_request(request_builder, RetryStrategy::Connection).await; + + // we should make at most one request and give up if it times out after + // connecting + assert_matches!(result, Err(_)); + req.assert_hits_async(1).await; + } } diff --git a/crates/turborepo-api-client/src/spaces.rs b/crates/turborepo-api-client/src/spaces.rs index 51a40327d1b0af..cc6f11a78e90ee 100644 --- a/crates/turborepo-api-client/src/spaces.rs +++ b/crates/turborepo-api-client/src/spaces.rs @@ -152,9 +152,11 @@ impl APIClient { .await? .json(&payload); - let response = retry::make_retryable_request(request_builder) - .await? - .error_for_status()?; + let response = + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) + .await? + .into_response() + .error_for_status()?; Ok(response.json().await?) } @@ -176,8 +178,9 @@ impl APIClient { .await? .json(&task); - retry::make_retryable_request(request_builder) + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) .await? + .into_response() .error_for_status()?; Ok(()) @@ -201,8 +204,9 @@ impl APIClient { .await? .json(&payload); - retry::make_retryable_request(request_builder) + retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout) .await? + .into_response() .error_for_status()?; Ok(()) diff --git a/crates/turborepo-api-client/src/telemetry.rs b/crates/turborepo-api-client/src/telemetry.rs index e3f0466b39d9f5..a0382af1dc779d 100644 --- a/crates/turborepo-api-client/src/telemetry.rs +++ b/crates/turborepo-api-client/src/telemetry.rs @@ -33,8 +33,9 @@ impl TelemetryClient for AnonAPIClient { .header("x-turbo-session-id", session_id) .json(&events); - retry::make_retryable_request(telemetry_request) + retry::make_retryable_request(telemetry_request, retry::RetryStrategy::Timeout) .await? + .into_response() .error_for_status()?; Ok(())