Skip to content

Commit

Permalink
config: output warn log when some components config invalid (tikv#12767)
Browse files Browse the repository at this point in the history
close tikv#12771

Signed-off-by: 3pointer <luancheng@pingcap.com>

Co-authored-by: zhangjinpeng1987 <zhangjinpeng@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored and BornChanger committed Jun 10, 2022
1 parent af04c37 commit 7d1d3c2
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 27 deletions.
10 changes: 5 additions & 5 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,12 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
fn on_change_cfg(&mut self, change: ConfigChange) {
// Validate first.
let mut validate_cfg = self.config.clone();
validate_cfg.update(change.clone());
validate_cfg.update(change);
if let Err(e) = validate_cfg.validate() {
warn!("cdc config update failed"; "error" => ?e);
return;
}

let change = self.config.diff(&validate_cfg);
info!(
"cdc config updated";
"current config" => ?self.config,
Expand Down Expand Up @@ -1542,13 +1542,13 @@ mod tests {
let mut updated_cfg = cfg.clone();
{
// Update it to be smaller than incremental_scan_threads,
// which will be an invalid change and will be lost.
// which will be an invalid change and will modified to incremental_scan_threads.
updated_cfg.incremental_scan_concurrency = 2;
}
let diff = cfg.diff(&updated_cfg);
ep.run(Task::ChangeConfig(diff));
assert_eq!(ep.config.incremental_scan_concurrency, 6);
assert_eq!(ep.scan_concurrency_semaphore.available_permits(), 6);
assert_eq!(ep.config.incremental_scan_concurrency, 4);
assert_eq!(ep.scan_concurrency_semaphore.available_permits(), 4);

{
// Correct update.
Expand Down
15 changes: 12 additions & 3 deletions components/sst_importer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@ impl Default for Config {
}

impl Config {
pub fn validate(&self) -> Result<(), Box<dyn Error>> {
pub fn validate(&mut self) -> Result<(), Box<dyn Error>> {
let default_cfg = Config::default();
if self.num_threads == 0 {
return Err("import.num_threads can not be 0".into());
warn!(
"import.num_threads can not be 0, change it to {}",
default_cfg.num_threads
);
self.num_threads = default_cfg.num_threads;
}
if self.stream_channel_window == 0 {
return Err("import.stream_channel_window can not be 0".into());
warn!(
"import.stream_channel_window can not be 0, change it to {}",
default_cfg.stream_channel_window
);
self.stream_channel_window = default_cfg.stream_channel_window;
}
Ok(())
}
Expand Down
70 changes: 51 additions & 19 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2316,16 +2316,29 @@ pub struct BackupConfig {
}

impl BackupConfig {
pub fn validate(&self) -> Result<(), Box<dyn Error>> {
pub fn validate(&mut self) -> Result<(), Box<dyn Error>> {
let limit = SysQuota::cpu_cores_quota() as usize;
let default_cfg = BackupConfig::default();
if self.num_threads == 0 || self.num_threads > limit {
return Err(format!("backup.num_threads cannot be 0 or larger than {}", limit).into());
warn!(
"backup.num_threads cannot be 0 or larger than {}, change it to {}",
limit, default_cfg.num_threads
);
self.num_threads = default_cfg.num_threads;
}
if self.batch_size == 0 {
return Err("backup.batch_size cannot be 0".into());
warn!(
"backup.batch_size cannot be 0, change it to {}",
default_cfg.batch_size
);
self.batch_size = default_cfg.batch_size;
}
if self.s3_multi_part_size.0 > ReadableSize::gb(5).0 {
return Err("backup.s3_multi_part_size cannot larger than 5GB".into());
warn!(
"backup.s3_multi_part_size cannot larger than 5GB, change it to {:?}",
default_cfg.s3_multi_part_size
);
self.s3_multi_part_size = default_cfg.s3_multi_part_size;
}

Ok(())
Expand Down Expand Up @@ -2373,9 +2386,15 @@ pub struct BackupStreamConfig {
}

impl BackupStreamConfig {
pub fn validate(&self) -> Result<(), Box<dyn Error>> {
if self.num_threads == 0 {
return Err("backup.num_threads cannot be 0".into());
pub fn validate(&mut self) -> Result<(), Box<dyn Error>> {
let limit = SysQuota::cpu_cores_quota() as usize;
let default_cfg = BackupStreamConfig::default();
if self.num_threads == 0 || self.num_threads > limit {
warn!(
"log_backup.num_threads cannot be 0 or larger than {}, change it to {}",
limit, default_cfg.num_threads
);
self.num_threads = default_cfg.num_threads;
}
Ok(())
}
Expand Down Expand Up @@ -2460,25 +2479,38 @@ impl Default for CdcConfig {

impl CdcConfig {
pub fn validate(&mut self) -> Result<(), Box<dyn Error>> {
let default_cfg = CdcConfig::default();
if self.min_ts_interval.is_zero() {
return Err("cdc.min-ts-interval can't be 0".into());
warn!(
"cdc.min-ts-interval can't be 0, change it to {}",
default_cfg.min_ts_interval
);
self.min_ts_interval = default_cfg.min_ts_interval;
}
if self.incremental_scan_threads == 0 {
return Err("cdc.incremental-scan-threads can't be 0".into());
warn!(
"cdc.incremental-scan-threads can't be 0, change it to {}",
default_cfg.incremental_scan_threads
);
self.incremental_scan_threads = default_cfg.incremental_scan_threads;
}
if self.incremental_scan_concurrency < self.incremental_scan_threads {
return Err(
"cdc.incremental-scan-concurrency must be larger than cdc.incremental-scan-threads"
.into(),
warn!(
"cdc.incremental-scan-concurrency must be larger than cdc.incremental-scan-threads,
change it to {}",
self.incremental_scan_threads
);
self.incremental_scan_concurrency = self.incremental_scan_threads
}
if self.incremental_scan_ts_filter_ratio < 0.0
|| self.incremental_scan_ts_filter_ratio > 1.0
{
return Err(
"cdc.incremental-scan-ts-filter-ratio should be larger than 0 and less than 1"
.into(),
warn!(
"cdc.incremental-scan-ts-filter-ratio should be larger than 0 and less than 1,
change it to {}",
default_cfg.incremental_scan_ts_filter_ratio
);
self.incremental_scan_ts_filter_ratio = default_cfg.incremental_scan_ts_filter_ratio;
}
Ok(())
}
Expand Down Expand Up @@ -5125,29 +5157,29 @@ mod tests {
min-ts-interval = "0s"
"#;
let mut cfg: TiKvConfig = toml::from_str(content).unwrap();
cfg.validate().unwrap_err();
cfg.validate().unwrap();

let content = r#"
[cdc]
incremental-scan-threads = 0
"#;
let mut cfg: TiKvConfig = toml::from_str(content).unwrap();
cfg.validate().unwrap_err();
cfg.validate().unwrap();

let content = r#"
[cdc]
incremental-scan-concurrency = 0
"#;
let mut cfg: TiKvConfig = toml::from_str(content).unwrap();
cfg.validate().unwrap_err();
cfg.validate().unwrap();

let content = r#"
[cdc]
incremental-scan-concurrency = 1
incremental-scan-threads = 2
"#;
let mut cfg: TiKvConfig = toml::from_str(content).unwrap();
cfg.validate().unwrap_err();
cfg.validate().unwrap();
}

#[test]
Expand Down

0 comments on commit 7d1d3c2

Please sign in to comment.