diff --git a/api/src/config.rs b/api/src/config.rs index b2f93bca75e..770b1b65e29 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -907,6 +907,9 @@ pub struct ProxyConfig { /// Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy. #[serde(default)] pub use_http: bool, + /// Elapsed time to pause proxy health check when the request is inactive, in seconds. + #[serde(default = "default_check_pause_elapsed")] + pub check_pause_elapsed: u64, } impl Default for ProxyConfig { @@ -917,6 +920,7 @@ impl Default for ProxyConfig { fallback: true, check_interval: 5, use_http: false, + check_pause_elapsed: 300, } } } @@ -938,6 +942,9 @@ pub struct MirrorConfig { /// Maximum number of failures before marking a mirror as unusable. #[serde(default = "default_failure_limit")] pub failure_limit: u8, + /// Elapsed time to pause mirror health check when the request is inactive, in seconds. + #[serde(default = "default_check_pause_elapsed")] + pub check_pause_elapsed: u64, } impl Default for MirrorConfig { @@ -948,6 +955,7 @@ impl Default for MirrorConfig { health_check_interval: 5, failure_limit: 5, ping_url: String::new(), + check_pause_elapsed: 300, } } } @@ -1191,6 +1199,10 @@ fn default_check_interval() -> u64 { 5 } +fn default_check_pause_elapsed() -> u64 { + 300 +} + fn default_failure_limit() -> u8 { 5 } diff --git a/docs/nydusd.md b/docs/nydusd.md index 242941e4266..a2a434f45b9 100644 --- a/docs/nydusd.md +++ b/docs/nydusd.md @@ -324,7 +324,7 @@ The `HttpProxy` backend also supports the `Proxy` and `Mirrors` configurations f ##### Enable Mirrors for Storage Backend (Recommend) -Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/setup/integration/nydus) to learn how configuring Nydus to use Dragonfly. +Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/next/operations/integrations/container-runtime/nydus/) to learn how configuring Nydus to use Dragonfly. Add `device.backend.config.mirrors` field to enable mirrors for storage backend. The mirror can be a P2P distribution server or registry. If the request to mirror server failed, it will fall back to the original registry. Currently, the mirror mode is only tested in the registry backend, and in theory, the OSS backend also supports it. @@ -356,6 +356,9 @@ Currently, the mirror mode is only tested in the registry backend, and in theory "health_check_interval": 5, // Failure counts before disabling this mirror. Use 5 as default if left empty. "failure_limit": 5, + // Elapsed time to pause mirror health check when the request is inactive, in seconds. + // Use 300 as default if left empty. + "check_pause_elapsed": 300, }, { "host": "http://dragonfly2.io:65001", @@ -393,6 +396,9 @@ Add `device.backend.config.proxy` field to enable HTTP proxy for storage backend "ping_url": "http://p2p-proxy:40901/server/ping", // Interval of P2P proxy health checking, in seconds "check_interval": 5 + // Elapsed time to pause proxy health check when the request is inactive, in seconds. + // Use 300 as default if left empty. + "check_pause_elapsed": 300, }, ... } diff --git a/storage/src/backend/connection.rs b/storage/src/backend/connection.rs index 6b6b2e69e43..4883ffcf85d 100644 --- a/storage/src/backend/connection.rs +++ b/storage/src/backend/connection.rs @@ -7,7 +7,7 @@ use std::cell::RefCell; use std::collections::HashMap; use std::io::{Read, Result}; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::{fmt, thread}; @@ -185,14 +185,16 @@ struct ProxyHealth { status: AtomicBool, ping_url: Option, check_interval: Duration, + check_pause_elapsed: u64, } impl ProxyHealth { - fn new(check_interval: u64, ping_url: Option) -> Self { + fn new(check_interval: u64, check_pause_elapsed: u64, ping_url: Option) -> Self { ProxyHealth { status: AtomicBool::from(true), ping_url, check_interval: Duration::from_secs(check_interval), + check_pause_elapsed, } } @@ -265,6 +267,8 @@ pub(crate) struct Connection { proxy: Option>, pub mirrors: Vec>, pub shutdown: AtomicBool, + /// Timestamp of connection's last active request, represents as duration since UNIX_EPOCH in seconds. + last_active: Arc, } #[derive(Debug)] @@ -314,7 +318,11 @@ impl Connection { }; Some(Arc::new(Proxy { client: Self::build_connection(&config.proxy.url, config)?, - health: ProxyHealth::new(config.proxy.check_interval, ping_url), + health: ProxyHealth::new( + config.proxy.check_interval, + config.proxy.check_pause_elapsed, + ping_url, + ), fallback: config.proxy.fallback, use_http: config.proxy.use_http, replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET), @@ -340,9 +348,15 @@ impl Connection { proxy, mirrors, shutdown: AtomicBool::new(false), + last_active: Arc::new(AtomicU64::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + )), }); - // Start proxy's health checking thread. + // Start proxy's health checking thread. connection.start_proxy_health_thread(config.connect_timeout as u64); // Start mirrors' health checking thread. @@ -355,37 +369,47 @@ impl Connection { if let Some(proxy) = self.proxy.as_ref() { if proxy.health.ping_url.is_some() { let proxy = proxy.clone(); - // Spawn thread to update the health status of proxy server + let last_active = Arc::clone(&self.last_active); + + // Spawn thread to update the health status of proxy server. thread::spawn(move || { let ping_url = proxy.health.ping_url.as_ref().unwrap(); let mut last_success = true; loop { - let client = Client::new(); - let _ = client - .get(ping_url.clone()) - .timeout(Duration::from_secs(connect_timeout as u64)) - .send() - .map(|resp| { - let success = is_success_status(resp.status()); - if last_success && !success { - warn!( - "Detected proxy unhealthy when pinging proxy, response status {}", - resp.status() - ); - } else if !last_success && success { - info!("Backend proxy recovered") - } - last_success = success; - proxy.health.set(success); - }) - .map_err(|e| { - if last_success { - warn!("Detected proxy unhealthy when ping proxy, {}", e); - } - last_success = false; - proxy.health.set(false) - }); + let elapsed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + - last_active.load(Ordering::Relaxed); + // If the connection is not active for a set time, skip proxy health check. + if elapsed <= proxy.health.check_pause_elapsed { + let client = Client::new(); + let _ = client + .get(ping_url.clone()) + .timeout(Duration::from_secs(connect_timeout as u64)) + .send() + .map(|resp| { + let success = is_success_status(resp.status()); + if last_success && !success { + warn!( + "Detected proxy unhealthy when pinging proxy, response status {}", + resp.status() + ); + } else if !last_success && success { + info!("Backend proxy recovered") + } + last_success = success; + proxy.health.set(success); + }) + .map_err(|e| { + if last_success { + warn!("Detected proxy unhealthy when ping proxy, {}", e); + } + last_success = false; + proxy.health.set(false) + }); + } thread::sleep(proxy.health.check_interval); } @@ -397,6 +421,9 @@ impl Connection { fn start_mirrors_health_thread(&self, timeout: u64) { for mirror in self.mirrors.iter() { let mirror_cloned = mirror.clone(); + let last_active = Arc::clone(&self.last_active); + + // Spawn thread to update the health status of mirror server. thread::spawn(move || { let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() { format!("{}/v2", mirror_cloned.config.host) @@ -410,35 +437,43 @@ impl Connection { let client = Client::new(); loop { - // Try to recover the mirror server when it is unavailable. - if !mirror_cloned.status.load(Ordering::Relaxed) { - info!( - "[mirror] server unhealthy, try to recover: {}", - mirror_cloned.config.host - ); + let elapsed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + - last_active.load(Ordering::Relaxed); + // If the connection is not active for a set time, skip mirror health check. + if elapsed <= mirror_cloned.config.check_pause_elapsed { + // Try to recover the mirror server when it is unavailable. + if !mirror_cloned.status.load(Ordering::Relaxed) { + info!( + "[mirror] server unhealthy, try to recover: {}", + mirror_cloned.config.host + ); - let _ = client - .get(mirror_health_url.as_str()) - .timeout(Duration::from_secs(timeout as u64)) - .send() - .map(|resp| { - // If the response status is less than StatusCode::INTERNAL_SERVER_ERROR, - // the mirror server is recovered. - if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { - info!( - "[mirror] server recovered: {}", - mirror_cloned.config.host + let _ = client + .get(mirror_health_url.as_str()) + .timeout(Duration::from_secs(timeout as u64)) + .send() + .map(|resp| { + // If the response status is less than StatusCode::INTERNAL_SERVER_ERROR, + // the mirror server is recovered. + if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { + info!( + "[mirror] server recovered: {}", + mirror_cloned.config.host + ); + mirror_cloned.failed_times.store(0, Ordering::Relaxed); + mirror_cloned.status.store(true, Ordering::Relaxed); + } + }) + .map_err(|e| { + warn!( + "[mirror] failed to recover server: {}, {}", + mirror_cloned.config.host, e ); - mirror_cloned.failed_times.store(0, Ordering::Relaxed); - mirror_cloned.status.store(true, Ordering::Relaxed); - } - }) - .map_err(|e| { - warn!( - "[mirror] failed to recover server: {}, {}", - mirror_cloned.config.host, e - ); - }); + }); + } } thread::sleep(Duration::from_secs( @@ -467,6 +502,13 @@ impl Connection { if self.shutdown.load(Ordering::Acquire) { return Err(ConnectionError::Disconnected); } + self.last_active.store( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + Ordering::Relaxed, + ); if let Some(proxy) = &self.proxy { if proxy.health.ok() { @@ -713,7 +755,7 @@ mod tests { #[test] fn test_proxy_health() { - let checker = ProxyHealth::new(5, None); + let checker = ProxyHealth::new(5, 300, None); assert!(checker.ok()); assert!(checker.ok()); @@ -741,6 +783,7 @@ mod tests { assert_eq!(config.connect_timeout, 5); assert_eq!(config.retry_limit, 0); assert_eq!(config.proxy.check_interval, 5); + assert_eq!(config.proxy.check_pause_elapsed, 300); assert!(config.proxy.fallback); assert_eq!(config.proxy.ping_url, ""); assert_eq!(config.proxy.url, "");