diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 3adaa8aca65..9b1b663b207 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -442,12 +442,12 @@ impl, E: KvEngine> Endpoint { fn on_change_cfg(&mut self, change: ConfigChange) { // Validate first. let mut validate_cfg = self.config.clone(); - validate_cfg.update(change.clone()); + validate_cfg.update(change); if let Err(e) = validate_cfg.validate() { warn!("cdc config update failed"; "error" => ?e); return; } - + let change = self.config.diff(&validate_cfg); info!( "cdc config updated"; "current config" => ?self.config, @@ -1542,13 +1542,13 @@ mod tests { let mut updated_cfg = cfg.clone(); { // Update it to be smaller than incremental_scan_threads, - // which will be an invalid change and will be lost. + // which will be an invalid change and will modified to incremental_scan_threads. updated_cfg.incremental_scan_concurrency = 2; } let diff = cfg.diff(&updated_cfg); ep.run(Task::ChangeConfig(diff)); - assert_eq!(ep.config.incremental_scan_concurrency, 6); - assert_eq!(ep.scan_concurrency_semaphore.available_permits(), 6); + assert_eq!(ep.config.incremental_scan_concurrency, 4); + assert_eq!(ep.scan_concurrency_semaphore.available_permits(), 4); { // Correct update. diff --git a/components/sst_importer/src/config.rs b/components/sst_importer/src/config.rs index a25d34ea24b..ef74a40fd01 100644 --- a/components/sst_importer/src/config.rs +++ b/components/sst_importer/src/config.rs @@ -27,12 +27,21 @@ impl Default for Config { } impl Config { - pub fn validate(&self) -> Result<(), Box> { + pub fn validate(&mut self) -> Result<(), Box> { + let default_cfg = Config::default(); if self.num_threads == 0 { - return Err("import.num_threads can not be 0".into()); + warn!( + "import.num_threads can not be 0, change it to {}", + default_cfg.num_threads + ); + self.num_threads = default_cfg.num_threads; } if self.stream_channel_window == 0 { - return Err("import.stream_channel_window can not be 0".into()); + warn!( + "import.stream_channel_window can not be 0, change it to {}", + default_cfg.stream_channel_window + ); + self.stream_channel_window = default_cfg.stream_channel_window; } Ok(()) } diff --git a/src/config.rs b/src/config.rs index 627901481d1..37278fd09e2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2316,16 +2316,29 @@ pub struct BackupConfig { } impl BackupConfig { - pub fn validate(&self) -> Result<(), Box> { + pub fn validate(&mut self) -> Result<(), Box> { let limit = SysQuota::cpu_cores_quota() as usize; + let default_cfg = BackupConfig::default(); if self.num_threads == 0 || self.num_threads > limit { - return Err(format!("backup.num_threads cannot be 0 or larger than {}", limit).into()); + warn!( + "backup.num_threads cannot be 0 or larger than {}, change it to {}", + limit, default_cfg.num_threads + ); + self.num_threads = default_cfg.num_threads; } if self.batch_size == 0 { - return Err("backup.batch_size cannot be 0".into()); + warn!( + "backup.batch_size cannot be 0, change it to {}", + default_cfg.batch_size + ); + self.batch_size = default_cfg.batch_size; } if self.s3_multi_part_size.0 > ReadableSize::gb(5).0 { - return Err("backup.s3_multi_part_size cannot larger than 5GB".into()); + warn!( + "backup.s3_multi_part_size cannot larger than 5GB, change it to {:?}", + default_cfg.s3_multi_part_size + ); + self.s3_multi_part_size = default_cfg.s3_multi_part_size; } Ok(()) @@ -2373,9 +2386,15 @@ pub struct BackupStreamConfig { } impl BackupStreamConfig { - pub fn validate(&self) -> Result<(), Box> { - if self.num_threads == 0 { - return Err("backup.num_threads cannot be 0".into()); + pub fn validate(&mut self) -> Result<(), Box> { + let limit = SysQuota::cpu_cores_quota() as usize; + let default_cfg = BackupStreamConfig::default(); + if self.num_threads == 0 || self.num_threads > limit { + warn!( + "log_backup.num_threads cannot be 0 or larger than {}, change it to {}", + limit, default_cfg.num_threads + ); + self.num_threads = default_cfg.num_threads; } Ok(()) } @@ -2460,25 +2479,38 @@ impl Default for CdcConfig { impl CdcConfig { pub fn validate(&mut self) -> Result<(), Box> { + let default_cfg = CdcConfig::default(); if self.min_ts_interval.is_zero() { - return Err("cdc.min-ts-interval can't be 0".into()); + warn!( + "cdc.min-ts-interval can't be 0, change it to {}", + default_cfg.min_ts_interval + ); + self.min_ts_interval = default_cfg.min_ts_interval; } if self.incremental_scan_threads == 0 { - return Err("cdc.incremental-scan-threads can't be 0".into()); + warn!( + "cdc.incremental-scan-threads can't be 0, change it to {}", + default_cfg.incremental_scan_threads + ); + self.incremental_scan_threads = default_cfg.incremental_scan_threads; } if self.incremental_scan_concurrency < self.incremental_scan_threads { - return Err( - "cdc.incremental-scan-concurrency must be larger than cdc.incremental-scan-threads" - .into(), + warn!( + "cdc.incremental-scan-concurrency must be larger than cdc.incremental-scan-threads, + change it to {}", + self.incremental_scan_threads ); + self.incremental_scan_concurrency = self.incremental_scan_threads } if self.incremental_scan_ts_filter_ratio < 0.0 || self.incremental_scan_ts_filter_ratio > 1.0 { - return Err( - "cdc.incremental-scan-ts-filter-ratio should be larger than 0 and less than 1" - .into(), + warn!( + "cdc.incremental-scan-ts-filter-ratio should be larger than 0 and less than 1, + change it to {}", + default_cfg.incremental_scan_ts_filter_ratio ); + self.incremental_scan_ts_filter_ratio = default_cfg.incremental_scan_ts_filter_ratio; } Ok(()) } @@ -5125,21 +5157,21 @@ mod tests { min-ts-interval = "0s" "#; let mut cfg: TiKvConfig = toml::from_str(content).unwrap(); - cfg.validate().unwrap_err(); + cfg.validate().unwrap(); let content = r#" [cdc] incremental-scan-threads = 0 "#; let mut cfg: TiKvConfig = toml::from_str(content).unwrap(); - cfg.validate().unwrap_err(); + cfg.validate().unwrap(); let content = r#" [cdc] incremental-scan-concurrency = 0 "#; let mut cfg: TiKvConfig = toml::from_str(content).unwrap(); - cfg.validate().unwrap_err(); + cfg.validate().unwrap(); let content = r#" [cdc] @@ -5147,7 +5179,7 @@ mod tests { incremental-scan-threads = 2 "#; let mut cfg: TiKvConfig = toml::from_str(content).unwrap(); - cfg.validate().unwrap_err(); + cfg.validate().unwrap(); } #[test]