Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip proxy health check if connection is not active #1595

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ pub struct ProxyConfig {
/// Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
#[serde(default)]
pub use_http: bool,
/// Elapsed time to pause proxy health check when the request is inactive, in seconds.
#[serde(default = "default_check_pause_elapsed")]
pub check_pause_elapsed: u64,
}

impl Default for ProxyConfig {
Expand All @@ -917,6 +920,7 @@ impl Default for ProxyConfig {
fallback: true,
check_interval: 5,
use_http: false,
check_pause_elapsed: 300,
}
}
}
Expand All @@ -938,6 +942,9 @@ pub struct MirrorConfig {
/// Maximum number of failures before marking a mirror as unusable.
#[serde(default = "default_failure_limit")]
pub failure_limit: u8,
/// Elapsed time to pause mirror health check when the request is inactive, in seconds.
#[serde(default = "default_check_pause_elapsed")]
pub health_check_pause_elapsed: u64,
}

impl Default for MirrorConfig {
Expand All @@ -948,6 +955,7 @@ impl Default for MirrorConfig {
health_check_interval: 5,
failure_limit: 5,
ping_url: String::new(),
health_check_pause_elapsed: 300,
}
}
}
Expand Down Expand Up @@ -1191,6 +1199,10 @@ fn default_check_interval() -> u64 {
5
}

fn default_check_pause_elapsed() -> u64 {
300
}

fn default_failure_limit() -> u8 {
5
}
Expand Down
8 changes: 7 additions & 1 deletion docs/nydusd.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ The `HttpProxy` backend also supports the `Proxy` and `Mirrors` configurations f

##### Enable Mirrors for Storage Backend (Recommend)

Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/setup/integration/nydus) to learn how configuring Nydus to use Dragonfly.
Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/next/operations/integrations/container-runtime/nydus/) to learn how configuring Nydus to use Dragonfly.

Add `device.backend.config.mirrors` field to enable mirrors for storage backend. The mirror can be a P2P distribution server or registry. If the request to mirror server failed, it will fall back to the original registry.
Currently, the mirror mode is only tested in the registry backend, and in theory, the OSS backend also supports it.
Expand Down Expand Up @@ -356,6 +356,9 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
"health_check_interval": 5,
// Failure counts before disabling this mirror. Use 5 as default if left empty.
"failure_limit": 5,
// Elapsed time to pause mirror health check when the request is inactive, in seconds.
// Use 300 as default if left empty.
"health_check_pause_elapsed": 300,
},
{
"host": "http://dragonfly2.io:65001",
Expand Down Expand Up @@ -393,6 +396,9 @@ Add `device.backend.config.proxy` field to enable HTTP proxy for storage backend
"ping_url": "http://p2p-proxy:40901/server/ping",
// Interval of P2P proxy health checking, in seconds
"check_interval": 5
// Elapsed time to pause proxy health check when the request is inactive, in seconds.
// Use 300 as default if left empty.
"check_pause_elapsed": 300,
},
...
}
Expand Down
159 changes: 101 additions & 58 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::io::{Read, Result};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fmt, thread};
Expand Down Expand Up @@ -185,14 +185,16 @@ struct ProxyHealth {
status: AtomicBool,
ping_url: Option<Url>,
check_interval: Duration,
check_pause_elapsed: u64,
}

impl ProxyHealth {
fn new(check_interval: u64, ping_url: Option<Url>) -> Self {
fn new(check_interval: u64, check_pause_elapsed: u64, ping_url: Option<Url>) -> Self {
ProxyHealth {
status: AtomicBool::from(true),
ping_url,
check_interval: Duration::from_secs(check_interval),
check_pause_elapsed,
}
}

Expand Down Expand Up @@ -265,6 +267,8 @@ pub(crate) struct Connection {
proxy: Option<Arc<Proxy>>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
/// Timestamp of connection's last active request, represents as duration since UNIX_EPOCH in seconds.
last_active: Arc<AtomicU64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -314,7 +318,11 @@ impl Connection {
};
Some(Arc::new(Proxy {
client: Self::build_connection(&config.proxy.url, config)?,
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
health: ProxyHealth::new(
config.proxy.check_interval,
config.proxy.check_pause_elapsed,
ping_url,
),
fallback: config.proxy.fallback,
use_http: config.proxy.use_http,
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
Expand All @@ -340,9 +348,15 @@ impl Connection {
proxy,
mirrors,
shutdown: AtomicBool::new(false),
last_active: Arc::new(AtomicU64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
)),
});

// Start proxy's health checking thread.
// Start proxy's health checking thread.
connection.start_proxy_health_thread(config.connect_timeout as u64);

// Start mirrors' health checking thread.
Expand All @@ -355,37 +369,47 @@ impl Connection {
if let Some(proxy) = self.proxy.as_ref() {
if proxy.health.ping_url.is_some() {
let proxy = proxy.clone();
// Spawn thread to update the health status of proxy server
let last_active = Arc::clone(&self.last_active);

// Spawn thread to update the health status of proxy server.
thread::spawn(move || {
let ping_url = proxy.health.ping_url.as_ref().unwrap();
let mut last_success = true;

loop {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
let elapsed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- last_active.load(Ordering::Relaxed);
// If the connection is not active for a set time, skip proxy health check.
if elapsed <= proxy.health.check_pause_elapsed {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
}

thread::sleep(proxy.health.check_interval);
}
Expand All @@ -397,6 +421,9 @@ impl Connection {
fn start_mirrors_health_thread(&self, timeout: u64) {
for mirror in self.mirrors.iter() {
let mirror_cloned = mirror.clone();
let last_active = Arc::clone(&self.last_active);

// Spawn thread to update the health status of mirror server.
thread::spawn(move || {
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
format!("{}/v2", mirror_cloned.config.host)
Expand All @@ -410,35 +437,43 @@ impl Connection {

let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);
let elapsed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- last_active.load(Ordering::Relaxed);
// If the connection is not active for a set time, skip mirror health check.
if elapsed <= mirror_cloned.config.health_check_pause_elapsed {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);

let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
});
});
}
}

thread::sleep(Duration::from_secs(
Expand Down Expand Up @@ -467,6 +502,13 @@ impl Connection {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
}
self.last_active.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
Ordering::Relaxed,
);

if let Some(proxy) = &self.proxy {
if proxy.health.ok() {
Expand Down Expand Up @@ -713,7 +755,7 @@ mod tests {

#[test]
fn test_proxy_health() {
let checker = ProxyHealth::new(5, None);
let checker = ProxyHealth::new(5, 300, None);

assert!(checker.ok());
assert!(checker.ok());
Expand Down Expand Up @@ -741,6 +783,7 @@ mod tests {
assert_eq!(config.connect_timeout, 5);
assert_eq!(config.retry_limit, 0);
assert_eq!(config.proxy.check_interval, 5);
assert_eq!(config.proxy.check_pause_elapsed, 300);
assert!(config.proxy.fallback);
assert_eq!(config.proxy.ping_url, "");
assert_eq!(config.proxy.url, "");
Expand Down
Loading