Skip to content

Commit

Permalink
feat: skip proxy health check if connection is not active
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Jul 3, 2024
1 parent 332f3dd commit 74bdd96
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
8 changes: 8 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ pub struct ProxyConfig {
/// Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
#[serde(default)]
pub use_http: bool,
/// Connection is considered to be inactive if the current time to the last active time exceeds max_elapsed_time.
#[serde(default = "default_max_elapsed_time")]
pub max_elapsed_time: u64,
}

impl Default for ProxyConfig {
Expand All @@ -917,6 +920,7 @@ impl Default for ProxyConfig {
fallback: true,
check_interval: 5,
use_http: false,
max_elapsed_time: 300,
}
}
}
Expand Down Expand Up @@ -1191,6 +1195,10 @@ fn default_check_interval() -> u64 {
5
}

fn default_max_elapsed_time() -> u64 {
300
}

fn default_failure_limit() -> u8 {
5
}
Expand Down
76 changes: 46 additions & 30 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::io::{Read, Result};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU8, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fmt, thread};

Expand Down Expand Up @@ -185,14 +185,16 @@ struct ProxyHealth {
status: AtomicBool,
ping_url: Option<Url>,
check_interval: Duration,
max_elapsed_time: Duration,
}

impl ProxyHealth {
fn new(check_interval: u64, ping_url: Option<Url>) -> Self {
fn new(check_interval: u64, max_elapsed_time: u64, ping_url: Option<Url>) -> Self {
ProxyHealth {
status: AtomicBool::from(true),
ping_url,
check_interval: Duration::from_secs(check_interval),
max_elapsed_time: Duration::from_secs(max_elapsed_time),
}
}

Expand Down Expand Up @@ -265,6 +267,7 @@ pub(crate) struct Connection {
proxy: Option<Arc<Proxy>>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
last_active: Arc<Mutex<Instant>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -314,7 +317,11 @@ impl Connection {
};
Some(Arc::new(Proxy {
client: Self::build_connection(&config.proxy.url, config)?,
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
health: ProxyHealth::new(
config.proxy.check_interval,
config.proxy.max_elapsed_time,
ping_url,
),
fallback: config.proxy.fallback,
use_http: config.proxy.use_http,
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
Expand All @@ -340,9 +347,10 @@ impl Connection {
proxy,
mirrors,
shutdown: AtomicBool::new(false),
last_active: Arc::new(Mutex::new(Instant::now())),
});

// Start proxy's health checking thread.
// Start proxy's health checking thread.
connection.start_proxy_health_thread(config.connect_timeout as u64);

// Start mirrors' health checking thread.
Expand All @@ -355,37 +363,42 @@ impl Connection {
if let Some(proxy) = self.proxy.as_ref() {
if proxy.health.ping_url.is_some() {
let proxy = proxy.clone();
let last_active = self.last_active.clone();
// Spawn thread to update the health status of proxy server
thread::spawn(move || {
let ping_url = proxy.health.ping_url.as_ref().unwrap();
let mut last_success = true;

loop {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
let elapsed = last_active.lock().unwrap().elapsed();
// If the connection is not active for a long time, skip the health check.
if elapsed <= proxy.health.max_elapsed_time {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
}

thread::sleep(proxy.health.check_interval);
}
Expand Down Expand Up @@ -466,6 +479,8 @@ impl Connection {
) -> ConnectionResult<Response> {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
} else {
*self.last_active.lock().unwrap() = Instant::now();
}

if let Some(proxy) = &self.proxy {
Expand Down Expand Up @@ -713,7 +728,7 @@ mod tests {

#[test]
fn test_proxy_health() {
let checker = ProxyHealth::new(5, None);
let checker = ProxyHealth::new(5, 300, None);

assert!(checker.ok());
assert!(checker.ok());
Expand Down Expand Up @@ -741,6 +756,7 @@ mod tests {
assert_eq!(config.connect_timeout, 5);
assert_eq!(config.retry_limit, 0);
assert_eq!(config.proxy.check_interval, 5);
assert_eq!(config.proxy.max_elapsed_time, 300);
assert!(config.proxy.fallback);
assert_eq!(config.proxy.ping_url, "");
assert_eq!(config.proxy.url, "");
Expand Down

0 comments on commit 74bdd96

Please sign in to comment.