Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry GCP requests on server error #2243

Merged
merged 7 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ quick-xml = { version = "0.23.0", features = ["serialize"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
ring = { version = "0.16", default-features = false, features = ["std"] }
base64 = { version = "0.13", default-features = false, optional = true }
rand = { version = "0.8", optional = true, features = ["std", "std_rng"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The features listed here are actually the default features that should always be included. So you could remove the explicit listing or pass default-features = false to prevent a silent extension of this feature set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the pattern established above, I'm not really sure which is better tbh. Using default-features is nice, but some crates have lots, I went for consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the crates above also use default-features = false which rand now doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, oops, yeah that's a typo 😅

# for rusoto
hyper = { version = "0.14", optional = true, default-features = false }
# for rusoto
Expand All @@ -58,7 +59,7 @@ percent-encoding = "2.1"
rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
snafu = "0.7"
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time", "io-util"] }
tracing = { version = "0.1" }
Expand All @@ -71,7 +72,7 @@ walkdir = "2"
[features]
azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64"]
gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]

[dev-dependencies] # In alphabetical order
Expand Down
156 changes: 156 additions & 0 deletions object_store/src/client/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use rand::prelude::*;
use std::time::Duration;

/// Exponential backoff with jitter
///
/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
#[allow(missing_copy_implementations)]
#[derive(Debug, Clone)]
pub struct BackoffConfig {
/// The initial backoff duration
pub init_backoff: Duration,
/// The maximum backoff duration
pub max_backoff: Duration,
/// The base of the exponential to use
pub base: f64,
}

impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(15),
base: 2.,
}
}
}

/// [`Backoff`] can be created from a [`BackoffConfig`]
///
/// Consecutive calls to [`Backoff::next`] will return the next backoff interval
///
pub struct Backoff {
init_backoff: f64,
next_backoff_secs: f64,
max_backoff_secs: f64,
base: f64,
rng: Option<Box<dyn RngCore + Sync + Send>>,
}

impl std::fmt::Debug for Backoff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backoff")
.field("init_backoff", &self.init_backoff)
.field("next_backoff_secs", &self.next_backoff_secs)
.field("max_backoff_secs", &self.max_backoff_secs)
.field("base", &self.base)
.finish()
}
}

impl Backoff {
/// Create a new [`Backoff`] from the provided [`BackoffConfig`]
pub fn new(config: &BackoffConfig) -> Self {
Self::new_with_rng(config, None)
}

/// Creates a new `Backoff` with the optional `rng`
///
/// Used [`rand::thread_rng()`] if no rng provided
pub fn new_with_rng(
config: &BackoffConfig,
rng: Option<Box<dyn RngCore + Sync + Send>>,
) -> Self {
let init_backoff = config.init_backoff.as_secs_f64();
Self {
init_backoff,
next_backoff_secs: init_backoff,
max_backoff_secs: config.max_backoff.as_secs_f64(),
base: config.base,
rng,
}
}

/// Returns the next backoff duration to wait for
pub fn next(&mut self) -> Duration {
let range = self.init_backoff..(self.next_backoff_secs * self.base);

let rand_backoff = match self.rng.as_mut() {
Some(rng) => rng.gen_range(range),
None => thread_rng().gen_range(range),
};

let next_backoff = self.max_backoff_secs.min(rand_backoff);
Duration::from_secs_f64(std::mem::replace(
&mut self.next_backoff_secs,
next_backoff,
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::mock::StepRng;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL that there's rand::rngs::mock 👍


#[test]
fn test_backoff() {
let init_backoff_secs = 1.;
let max_backoff_secs = 500.;
let base = 3.;

let config = BackoffConfig {
init_backoff: Duration::from_secs_f64(init_backoff_secs),
max_backoff: Duration::from_secs_f64(max_backoff_secs),
base,
};

let assert_fuzzy_eq =
|a: f64, b: f64| assert!((b - a).abs() < 0.0001, "{} != {}", a, b);

// Create a static rng that takes the minimum of the range
let rng = Box::new(StepRng::new(0, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for _ in 0..20 {
assert_eq!(backoff.next().as_secs_f64(), init_backoff_secs);
}

// Create a static rng that takes the maximum of the range
let rng = Box::new(StepRng::new(u64::MAX, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for i in 0..20 {
let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
}

// Create a static rng that takes the mid point of the range
let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

let mut value = init_backoff_secs;
for _ in 0..20 {
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
value = (init_backoff_secs + (value * base - init_backoff_secs) / 2.)
.min(max_backoff_secs);
}
}
}
23 changes: 23 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Generic utilities reqwest based ObjectStore implementations

pub mod backoff;
pub mod oauth;
pub mod retry;
pub mod token;
12 changes: 9 additions & 3 deletions object_store/src/oauth.rs → object_store/src/client/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::token::TemporaryToken;
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
use crate::RetryConfig;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -133,7 +135,11 @@ impl OAuthProvider {
}

/// Fetch a fresh token
pub async fn fetch_token(&self, client: &Client) -> Result<TemporaryToken<String>> {
pub async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<String>> {
let now = seconds_since_epoch();
let exp = now + 3600;

Expand Down Expand Up @@ -168,7 +174,7 @@ impl OAuthProvider {
let response: TokenResponse = client
.request(Method::POST, &self.audience)
.form(&body)
.send()
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.error_for_status()
Expand Down
103 changes: 103 additions & 0 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! A shared HTTP client implementation incorporating retries

use crate::client::backoff::{Backoff, BackoffConfig};
use futures::future::BoxFuture;
use futures::FutureExt;
use reqwest::{Response, Result};
use std::time::{Duration, Instant};

/// Contains the configuration for how to respond to server errors
///
/// By default they will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
pub backoff: BackoffConfig,

/// The maximum number of times to retry a request
///
/// Set to 0 to disable retries
pub max_retries: usize,

/// The maximum length of time from the initial request
/// after which no further retries will be attempted
///
/// This not only bounds the length of time before a server
/// error will be surfaced to the application, but also bounds
/// the length of time a request's credentials must remain valid.
///
/// As requests are retried without renewing credentials or
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could theoretically re-sign requests / regenerate credentials, however, I decided against this for a couple of reasons:

  • It's non-trivial additional complexity
  • The intent of this feature is to hide intermittent failures, a 5 minute outage is not really intermittent
  • We want to surface the error to the user eventually

/// regenerating request payloads, this number should be kept
/// below 5 minutes to avoid errors due to expired credentials
/// and/or request payloads
pub retry_timeout: Duration,
}

impl Default for RetryConfig {
fn default() -> Self {
Self {
backoff: Default::default(),
max_retries: 10,
retry_timeout: Duration::from_secs(3 * 60),
}
}
}

pub trait RetryExt {
/// Dispatch a request with the given retry configuration
///
/// # Panic
///
/// This will panic if the request body is a stream
fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>>;
}

impl RetryExt for reqwest::RequestBuilder {
fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some logging would be nice here for:

  • retries
  • giving up

let mut backoff = Backoff::new(&config.backoff);
let max_retries = config.max_retries;
let retry_timeout = config.retry_timeout;

async move {
let mut retries = 0;
let now = Instant::now();

loop {
let s = self.try_clone().expect("request body must be cloneable");
match s.send().await {
Err(e)
if retries < max_retries
&& now.elapsed() < retry_timeout
&& e.status()
.map(|s| s.is_server_error())
.unwrap_or(false) =>
{
retries += 1;
tokio::time::sleep(backoff.next()).await;
}
r => return r,
}
}
}
.boxed()
}
}
File renamed without changes.
Loading