Skip to content

Commit

Permalink
feat: skip health check if connection is not active
Browse files Browse the repository at this point in the history
1. Add last_active field for Connection. When Connection.call() is called, last_active is updated to current timestamp.
2. Add check_pause_elapsed field for ProxyConfig and MirrorConfig. Connection is considered to be inactive if the current time to the last_active time exceeds check_pause_elapsed.
3. In proxy and mirror's health checking thread's loop, if the connection is not active (exceeds check_pause_elapsed), this round of health check is skipped.
4. Update the document.

Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Jul 9, 2024
1 parent 455c856 commit bb18970
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 59 deletions.
12 changes: 12 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ pub struct ProxyConfig {
/// Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
#[serde(default)]
pub use_http: bool,
/// Elapsed time to pause proxy health check when the request is inactive, in seconds.
#[serde(default = "default_check_pause_elapsed")]
pub check_pause_elapsed: u64,
}

impl Default for ProxyConfig {
Expand All @@ -917,6 +920,7 @@ impl Default for ProxyConfig {
fallback: true,
check_interval: 5,
use_http: false,
check_pause_elapsed: 300,
}
}
}
Expand All @@ -938,6 +942,9 @@ pub struct MirrorConfig {
/// Maximum number of failures before marking a mirror as unusable.
#[serde(default = "default_failure_limit")]
pub failure_limit: u8,
/// Elapsed time to pause mirror health check when the request is inactive, in seconds.
#[serde(default = "default_check_pause_elapsed")]
pub check_pause_elapsed: u64,
}

impl Default for MirrorConfig {
Expand All @@ -948,6 +955,7 @@ impl Default for MirrorConfig {
health_check_interval: 5,
failure_limit: 5,
ping_url: String::new(),
check_pause_elapsed: 300,
}
}
}
Expand Down Expand Up @@ -1191,6 +1199,10 @@ fn default_check_interval() -> u64 {
5
}

fn default_check_pause_elapsed() -> u64 {
300
}

fn default_failure_limit() -> u8 {
5
}
Expand Down
8 changes: 7 additions & 1 deletion docs/nydusd.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ The `HttpProxy` backend also supports the `Proxy` and `Mirrors` configurations f

##### Enable Mirrors for Storage Backend (Recommend)

Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/setup/integration/nydus) to learn how configuring Nydus to use Dragonfly.
Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/next/operations/integrations/container-runtime/nydus/) to learn how configuring Nydus to use Dragonfly.

Add `device.backend.config.mirrors` field to enable mirrors for storage backend. The mirror can be a P2P distribution server or registry. If the request to mirror server failed, it will fall back to the original registry.
Currently, the mirror mode is only tested in the registry backend, and in theory, the OSS backend also supports it.
Expand Down Expand Up @@ -356,6 +356,9 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
"health_check_interval": 5,
// Failure counts before disabling this mirror. Use 5 as default if left empty.
"failure_limit": 5,
// Elapsed time to pause mirror health check when the request is inactive, in seconds.
// Use 300 as default if left empty.
"check_pause_elapsed": 300,
},
{
"host": "http://dragonfly2.io:65001",
Expand Down Expand Up @@ -393,6 +396,9 @@ Add `device.backend.config.proxy` field to enable HTTP proxy for storage backend
"ping_url": "http://p2p-proxy:40901/server/ping",
// Interval of P2P proxy health checking, in seconds
"check_interval": 5
// Elapsed time to pause proxy health check when the request is inactive, in seconds.
// Use 300 as default if left empty.
"check_pause_elapsed": 300,
},
...
}
Expand Down
159 changes: 101 additions & 58 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::io::{Read, Result};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fmt, thread};
Expand Down Expand Up @@ -185,14 +185,16 @@ struct ProxyHealth {
status: AtomicBool,
ping_url: Option<Url>,
check_interval: Duration,
check_pause_elapsed: u64,
}

impl ProxyHealth {
fn new(check_interval: u64, ping_url: Option<Url>) -> Self {
fn new(check_interval: u64, check_pause_elapsed: u64, ping_url: Option<Url>) -> Self {
ProxyHealth {
status: AtomicBool::from(true),
ping_url,
check_interval: Duration::from_secs(check_interval),
check_pause_elapsed,
}
}

Expand Down Expand Up @@ -265,6 +267,8 @@ pub(crate) struct Connection {
proxy: Option<Arc<Proxy>>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
/// Timestamp of connection's last active request, represents as duration since UNIX_EPOCH in seconds.
last_active: Arc<AtomicU64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -314,7 +318,11 @@ impl Connection {
};
Some(Arc::new(Proxy {
client: Self::build_connection(&config.proxy.url, config)?,
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
health: ProxyHealth::new(
config.proxy.check_interval,
config.proxy.check_pause_elapsed,
ping_url,
),
fallback: config.proxy.fallback,
use_http: config.proxy.use_http,
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
Expand All @@ -340,9 +348,15 @@ impl Connection {
proxy,
mirrors,
shutdown: AtomicBool::new(false),
last_active: Arc::new(AtomicU64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
)),
});

// Start proxy's health checking thread.
// Start proxy's health checking thread.
connection.start_proxy_health_thread(config.connect_timeout as u64);

// Start mirrors' health checking thread.
Expand All @@ -355,37 +369,47 @@ impl Connection {
if let Some(proxy) = self.proxy.as_ref() {
if proxy.health.ping_url.is_some() {
let proxy = proxy.clone();
// Spawn thread to update the health status of proxy server
let last_active = Arc::clone(&self.last_active);

// Spawn thread to update the health status of proxy server.
thread::spawn(move || {
let ping_url = proxy.health.ping_url.as_ref().unwrap();
let mut last_success = true;

loop {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
let elapsed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- last_active.load(Ordering::Relaxed);
// If the connection is not active for a set time, skip proxy health check.
if elapsed <= proxy.health.check_pause_elapsed {
let client = Client::new();
let _ = client
.get(ping_url.clone())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
let success = is_success_status(resp.status());
if last_success && !success {
warn!(
"Detected proxy unhealthy when pinging proxy, response status {}",
resp.status()
);
} else if !last_success && success {
info!("Backend proxy recovered")
}
last_success = success;
proxy.health.set(success);
})
.map_err(|e| {
if last_success {
warn!("Detected proxy unhealthy when ping proxy, {}", e);
}
last_success = false;
proxy.health.set(false)
});
}

thread::sleep(proxy.health.check_interval);
}
Expand All @@ -397,6 +421,9 @@ impl Connection {
fn start_mirrors_health_thread(&self, timeout: u64) {
for mirror in self.mirrors.iter() {
let mirror_cloned = mirror.clone();
let last_active = Arc::clone(&self.last_active);

// Spawn thread to update the health status of mirror server.
thread::spawn(move || {
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
format!("{}/v2", mirror_cloned.config.host)
Expand All @@ -410,35 +437,43 @@ impl Connection {

let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);
let elapsed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- last_active.load(Ordering::Relaxed);
// If the connection is not active for a set time, skip mirror health check.
if elapsed <= mirror_cloned.config.check_pause_elapsed {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"[mirror] server unhealthy, try to recover: {}",
mirror_cloned.config.host
);

let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!(
"[mirror] server recovered: {}",
mirror_cloned.config.host
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"[mirror] failed to recover server: {}, {}",
mirror_cloned.config.host, e
);
});
});
}
}

thread::sleep(Duration::from_secs(
Expand Down Expand Up @@ -467,6 +502,13 @@ impl Connection {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
}
self.last_active.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
Ordering::Relaxed,
);

if let Some(proxy) = &self.proxy {
if proxy.health.ok() {
Expand Down Expand Up @@ -713,7 +755,7 @@ mod tests {

#[test]
fn test_proxy_health() {
let checker = ProxyHealth::new(5, None);
let checker = ProxyHealth::new(5, 300, None);

assert!(checker.ok());
assert!(checker.ok());
Expand Down Expand Up @@ -741,6 +783,7 @@ mod tests {
assert_eq!(config.connect_timeout, 5);
assert_eq!(config.retry_limit, 0);
assert_eq!(config.proxy.check_interval, 5);
assert_eq!(config.proxy.check_pause_elapsed, 300);
assert!(config.proxy.fallback);
assert_eq!(config.proxy.ping_url, "");
assert_eq!(config.proxy.url, "");
Expand Down

0 comments on commit bb18970

Please sign in to comment.