Skip to content

Commit

Permalink
Retry when no or partial response from server.
Browse files Browse the repository at this point in the history
Retry when server fails unexpectedly, or if there are network issues that are not handled by
hyper.
  • Loading branch information
kindly committed Apr 26, 2023
1 parent b981921 commit 3822a3c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
3 changes: 2 additions & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ walkdir = "2"

# Cloud storage support
base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
hyper = { version = "0.14", default-features = false, optional = true }
quick-xml = { version = "0.28.0", features = ["serialize"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
Expand All @@ -66,7 +67,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
nix = "0.26.1"

[features]
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
Expand Down
42 changes: 37 additions & 5 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use reqwest::header::LOCATION;
use reqwest::{Response, StatusCode};
use std::time::{Duration, Instant};
use tracing::info;
use snafu::Error as SnafuError;

/// Retry request error
#[derive(Debug)]
Expand Down Expand Up @@ -192,11 +193,29 @@ impl RetryExt for reqwest::RequestBuilder {
},
Err(e) =>
{
return Err(Error{
retries,
message: "request error".to_string(),
source: Some(e)
})
let mut do_retry = false;
if let Some(source) = e.source() {
if let Some(e) = source.downcast_ref::<hyper::Error>() {
if e.is_connect() || e.is_closed() || e.is_incomplete_message() {
do_retry = true;
}
}
}

if retries == max_retries
|| now.elapsed() > retry_timeout
|| !do_retry {

return Err(Error{
retries,
message: "request error".to_string(),
source: Some(e)
})
}
let sleep = backoff.next();
retries += 1;
info!("Encountered request error ({}) backing off for {} seconds, retry {} of {}", e, sleep.as_secs_f32(), retries, max_retries);
tokio::time::sleep(sleep).await;
}
}
}
Expand Down Expand Up @@ -345,6 +364,19 @@ mod tests {
assert_eq!(e.retries, retry.max_retries);
assert_eq!(e.message, "502 Bad Gateway");

// Panic results in an incomplete message error in the client
mock.push_fn(|_| {panic!()});
let r = do_request().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);

// Gives up after retrying mulitiple panics
for _ in 0..=retry.max_retries {
mock.push_fn(|_| {panic!()});
}
let e = do_request().await.unwrap_err();
assert_eq!(e.retries, retry.max_retries);
assert_eq!(e.message, "request error");

// Shutdown
mock.shutdown().await
}
Expand Down

0 comments on commit 3822a3c

Please sign in to comment.