From 7e6e2f288c6356eb3d81efcce6981ce0eb4b416a Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 1 Nov 2022 20:38:14 +0800 Subject: [PATCH 1/3] feat: improve the stat performance with parallel --- .../src/interpreters/interpreter_common.rs | 59 ++++++++++++++++--- .../src/interpreters/interpreter_copy_v2.rs | 49 +++++++-------- 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_common.rs b/src/query/service/src/interpreters/interpreter_common.rs index 00d8317a87ee..2aed3eb9e309 100644 --- a/src/query/service/src/interpreters/interpreter_common.rs +++ b/src/query/service/src/interpreters/interpreter_common.rs @@ -17,16 +17,21 @@ use std::sync::Arc; use chrono::TimeZone; use chrono::Utc; +use common_base::base::tokio::sync::Semaphore; use common_base::base::GlobalIORuntime; +use common_base::base::Runtime; use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::GrantObject; use common_meta_types::StageFile; use common_meta_types::UserStageInfo; use common_pipeline_core::Pipeline; use futures::TryStreamExt; +use futures_util::future; use tracing::debug; use tracing::warn; +use tracing::Instrument; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; @@ -144,16 +149,16 @@ pub async fn validate_grant_object_exists( Ok(()) } -pub async fn stat_file( - ctx: &Arc, - stage: &UserStageInfo, - path: &str, +async fn stat_file( + ctx: Arc, + stage: UserStageInfo, + path: String, ) -> Result { let table_ctx: Arc = ctx.clone(); - let op = StageTable::get_op(&table_ctx, stage)?; - let meta = op.object(path).metadata().await?; + let op = StageTable::get_op(&table_ctx, &stage)?; + let meta = op.object(&path).metadata().await?; Ok(StageFile { - path: path.to_owned(), + path, size: meta.content_length(), md5: meta.content_md5().map(str::to_string), last_modified: meta @@ -164,6 +169,46 @@ pub async fn stat_file( }) } +// Stat files in parallel. +pub async fn stat_files( + ctx: &Arc, + stage: &UserStageInfo, + files: impl IntoIterator>, +) -> Result>> { + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + + // 1.1 combine all the tasks. + let mut iter = files.into_iter(); + let tasks = std::iter::from_fn(move || { + let ctx = ctx.clone(); + if let Some(location) = iter.next() { + let location = location.as_ref().to_owned(); + Some( + stat_file(ctx, stage.clone(), location) + .instrument(tracing::debug_span!("stat_file")), + ) + } else { + None + } + }); + + // 1.2 build the runtime. + let semaphore = Semaphore::new(max_io_requests); + let stat_runtime = Arc::new(Runtime::with_worker_threads( + max_runtime_threads, + Some("stat-files-worker".to_owned()), + )?); + + // 1.3 spawn all the tasks to the runtime. + let join_handlers = stat_runtime.try_spawn_batch(semaphore, tasks).await?; + + // 1.4 get all the result. + future::try_join_all(join_handlers) + .await + .map_err(|e| ErrorCode::LogicalError(format!("Stat files in parallel failure, {}", e))) +} + /// List files from DAL in recursive way. /// /// - If input path is a dir, we will list it recursively. diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index b62280ff8d7b..5eb4256727bb 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -32,7 +32,7 @@ use regex::Regex; use super::append2table; use crate::catalogs::Catalog; use crate::interpreters::interpreter_common::list_files; -use crate::interpreters::interpreter_common::stat_file; +use crate::interpreters::interpreter_common::stat_files; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreterV2; use crate::pipelines::PipelineBuildResult; @@ -55,7 +55,7 @@ impl CopyInterpreterV2 { Ok(CopyInterpreterV2 { ctx, plan }) } - async fn do_query_copied_files_info( + async fn get_copied_files_info( &self, catalog_name: String, database_name: String, @@ -121,7 +121,7 @@ impl CopyInterpreterV2 { let mut file_info = BTreeMap::new(); for query_copied_files in files.chunks(MAX_QUERY_COPIED_FILES_NUM) { - self.do_query_copied_files_info( + self.get_copied_files_info( catalog_name.to_string(), database_name.to_string(), table_id, @@ -131,26 +131,28 @@ impl CopyInterpreterV2 { .await?; } - for file in files.iter() { - let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?; - - if let Some(file_info) = file_info.get(file) { - match &file_info.etag { + let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?; + for stat_info in stat_infos.into_iter().flatten() { + if let Some(copied_file_info) = file_info.get(&stat_info.path) { + match &copied_file_info.etag { Some(_etag) => { // No need to copy the file again if etag is_some and match. - if stage_file.etag == file_info.etag { - tracing::warn!("ignore copy file {:?} matched by etag", file); + if stat_info.etag == copied_file_info.etag { + tracing::warn!( + "ignore copy file {:?} matched by etag", + copied_file_info + ); continue; } } None => { // etag is none, compare with content_length and last_modified. - if file_info.content_length == stage_file.size - && file_info.last_modified == Some(stage_file.last_modified) + if copied_file_info.content_length == stat_info.size + && copied_file_info.last_modified == Some(stat_info.last_modified) { tracing::warn!( "ignore copy file {:?} matched by content_length and last_modified", - file + copied_file_info ); continue; } @@ -159,21 +161,20 @@ impl CopyInterpreterV2 { } // unmatch case: insert into file map for copy. - file_map.insert(file.clone(), TableCopiedFileInfo { - etag: stage_file.etag.clone(), - content_length: stage_file.size, - last_modified: Some(stage_file.last_modified), + file_map.insert(stat_info.path.clone(), TableCopiedFileInfo { + etag: stat_info.etag.clone(), + content_length: stat_info.size, + last_modified: Some(stat_info.last_modified), }); } } else { // if force is true, copy all the file. - for file in files.iter() { - let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?; - - file_map.insert(file.clone(), TableCopiedFileInfo { - etag: stage_file.etag.clone(), - content_length: stage_file.size, - last_modified: Some(stage_file.last_modified), + let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?; + for stat_info in stat_infos.into_iter().flatten() { + file_map.insert(stat_info.path, TableCopiedFileInfo { + etag: stat_info.etag.clone(), + content_length: stat_info.size, + last_modified: Some(stat_info.last_modified), }); } } From 7ac62eb4e4afe7a0431f9cb3bd72d74680b8afb8 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 1 Nov 2022 21:27:46 +0800 Subject: [PATCH 2/3] change the stat parallel to max threads --- src/query/service/src/interpreters/interpreter_common.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_common.rs b/src/query/service/src/interpreters/interpreter_common.rs index 2aed3eb9e309..28535eec57ca 100644 --- a/src/query/service/src/interpreters/interpreter_common.rs +++ b/src/query/service/src/interpreters/interpreter_common.rs @@ -175,9 +175,6 @@ pub async fn stat_files( stage: &UserStageInfo, files: impl IntoIterator>, ) -> Result>> { - let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; - let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; - // 1.1 combine all the tasks. let mut iter = files.into_iter(); let tasks = std::iter::from_fn(move || { @@ -194,7 +191,8 @@ pub async fn stat_files( }); // 1.2 build the runtime. - let semaphore = Semaphore::new(max_io_requests); + let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; + let semaphore = Semaphore::new(max_runtime_threads); let stat_runtime = Arc::new(Runtime::with_worker_threads( max_runtime_threads, Some("stat-files-worker".to_owned()), From f71d1beffc9d0de3c2feaf3e29e17008b1fd1f07 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 1 Nov 2022 21:58:43 +0800 Subject: [PATCH 3/3] reuse the StageFile for filter copied files --- .../src/interpreters/interpreter_common.rs | 57 ++-------- .../src/interpreters/interpreter_copy_v2.rs | 105 +++++++++--------- 2 files changed, 58 insertions(+), 104 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_common.rs b/src/query/service/src/interpreters/interpreter_common.rs index 28535eec57ca..28ca6c390eb8 100644 --- a/src/query/service/src/interpreters/interpreter_common.rs +++ b/src/query/service/src/interpreters/interpreter_common.rs @@ -17,21 +17,16 @@ use std::sync::Arc; use chrono::TimeZone; use chrono::Utc; -use common_base::base::tokio::sync::Semaphore; use common_base::base::GlobalIORuntime; -use common_base::base::Runtime; use common_datavalues::DataSchemaRef; -use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::GrantObject; use common_meta_types::StageFile; use common_meta_types::UserStageInfo; use common_pipeline_core::Pipeline; use futures::TryStreamExt; -use futures_util::future; use tracing::debug; use tracing::warn; -use tracing::Instrument; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; @@ -149,16 +144,16 @@ pub async fn validate_grant_object_exists( Ok(()) } -async fn stat_file( - ctx: Arc, - stage: UserStageInfo, - path: String, +pub async fn stat_file( + ctx: &Arc, + stage: &UserStageInfo, + path: &str, ) -> Result { let table_ctx: Arc = ctx.clone(); - let op = StageTable::get_op(&table_ctx, &stage)?; - let meta = op.object(&path).metadata().await?; + let op = StageTable::get_op(&table_ctx, stage)?; + let meta = op.object(path).metadata().await?; Ok(StageFile { - path, + path: path.to_string(), size: meta.content_length(), md5: meta.content_md5().map(str::to_string), last_modified: meta @@ -169,44 +164,6 @@ async fn stat_file( }) } -// Stat files in parallel. -pub async fn stat_files( - ctx: &Arc, - stage: &UserStageInfo, - files: impl IntoIterator>, -) -> Result>> { - // 1.1 combine all the tasks. - let mut iter = files.into_iter(); - let tasks = std::iter::from_fn(move || { - let ctx = ctx.clone(); - if let Some(location) = iter.next() { - let location = location.as_ref().to_owned(); - Some( - stat_file(ctx, stage.clone(), location) - .instrument(tracing::debug_span!("stat_file")), - ) - } else { - None - } - }); - - // 1.2 build the runtime. - let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize; - let semaphore = Semaphore::new(max_runtime_threads); - let stat_runtime = Arc::new(Runtime::with_worker_threads( - max_runtime_threads, - Some("stat-files-worker".to_owned()), - )?); - - // 1.3 spawn all the tasks to the runtime. - let join_handlers = stat_runtime.try_spawn_batch(semaphore, tasks).await?; - - // 1.4 get all the result. - future::try_join_all(join_handlers) - .await - .map_err(|e| ErrorCode::LogicalError(format!("Stat files in parallel failure, {}", e))) -} - /// List files from DAL in recursive way. /// /// - If input path is a dir, we will list it recursively. diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 5eb4256727bb..05d98bc2c967 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -23,6 +23,7 @@ use common_exception::Result; use common_meta_app::schema::GetTableCopiedFileReq; use common_meta_app::schema::TableCopiedFileInfo; use common_meta_app::schema::UpsertTableCopiedFileReq; +use common_meta_types::StageFile; use common_meta_types::UserStageInfo; use common_planner::stage_table::StageTableInfo; use common_planner::ReadDataSourcePlan; @@ -32,7 +33,7 @@ use regex::Regex; use super::append2table; use crate::catalogs::Catalog; use crate::interpreters::interpreter_common::list_files; -use crate::interpreters::interpreter_common::stat_files; +use crate::interpreters::interpreter_common::stat_file; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreterV2; use crate::pipelines::PipelineBuildResult; @@ -98,14 +99,13 @@ impl CopyInterpreterV2 { Ok(()) } - async fn filter_duplicate_files( + async fn filter_have_copied_files( &self, force: bool, - table_info: &StageTableInfo, catalog_name: &str, database_name: &str, table_name: &str, - files: &[String], + stage_files: &[StageFile], ) -> Result<(u64, BTreeMap)> { let catalog = self.ctx.get_catalog(catalog_name)?; let tenant = self.ctx.get_tenant(); @@ -118,41 +118,43 @@ impl CopyInterpreterV2 { if !force { // if force is false, copy only the files that unmatch to the meta copied files info. - let mut file_info = BTreeMap::new(); - + let files = stage_files + .iter() + .map(|v| v.path.clone()) + .collect::>(); + let mut copied_files = BTreeMap::new(); for query_copied_files in files.chunks(MAX_QUERY_COPIED_FILES_NUM) { self.get_copied_files_info( catalog_name.to_string(), database_name.to_string(), table_id, query_copied_files, - &mut file_info, + &mut copied_files, ) .await?; } - let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?; - for stat_info in stat_infos.into_iter().flatten() { - if let Some(copied_file_info) = file_info.get(&stat_info.path) { - match &copied_file_info.etag { + for stage_file in stage_files { + if let Some(copied_file) = copied_files.get(&stage_file.path) { + match &copied_file.etag { Some(_etag) => { // No need to copy the file again if etag is_some and match. - if stat_info.etag == copied_file_info.etag { + if stage_file.etag == copied_file.etag { tracing::warn!( "ignore copy file {:?} matched by etag", - copied_file_info + copied_file ); continue; } } None => { // etag is none, compare with content_length and last_modified. - if copied_file_info.content_length == stat_info.size - && copied_file_info.last_modified == Some(stat_info.last_modified) + if copied_file.content_length == stage_file.size + && copied_file.last_modified == Some(stage_file.last_modified) { tracing::warn!( "ignore copy file {:?} matched by content_length and last_modified", - copied_file_info + copied_file ); continue; } @@ -161,20 +163,19 @@ impl CopyInterpreterV2 { } // unmatch case: insert into file map for copy. - file_map.insert(stat_info.path.clone(), TableCopiedFileInfo { - etag: stat_info.etag.clone(), - content_length: stat_info.size, - last_modified: Some(stat_info.last_modified), + file_map.insert(stage_file.path.clone(), TableCopiedFileInfo { + etag: stage_file.etag.clone(), + content_length: stage_file.size, + last_modified: Some(stage_file.last_modified), }); } } else { // if force is true, copy all the file. - let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?; - for stat_info in stat_infos.into_iter().flatten() { - file_map.insert(stat_info.path, TableCopiedFileInfo { - etag: stat_info.etag.clone(), - content_length: stat_info.size, - last_modified: Some(stat_info.last_modified), + for stage_file in stage_files { + file_map.insert(stage_file.path.clone(), TableCopiedFileInfo { + etag: stage_file.etag.clone(), + content_length: stage_file.size, + last_modified: Some(stage_file.last_modified), }); } } @@ -427,47 +428,43 @@ impl Interpreter for CopyInterpreterV2 { SourceInfo::StageSource(table_info) => { let path = &table_info.path; - // Here we add the path to the file: /path/to/path/file1. - let mut files = if !files.is_empty() { - let mut files_with_path = vec![]; + let mut stage_files = if !files.is_empty() { + let mut res = vec![]; + for file in files { - let new_path = Path::new(path).join(file); - files_with_path.push(new_path.to_string_lossy().to_string()); + // Here we add the path to the file: /path/to/path/file1. + let new_path = Path::new(path).join(file).to_string_lossy().to_string(); + let info = + stat_file(&self.ctx, &table_info.stage_info, &new_path).await?; + res.push(info); } - files_with_path + res } else { - let stage_files = - list_files(&self.ctx.clone(), &table_info.stage_info, path).await?; - - // TODO(@xuanwo): Reuse existing metadata in StageFile. - stage_files.into_iter().map(|v| v.path).collect() + list_files(&self.ctx.clone(), &table_info.stage_info, path).await? }; - tracing::debug!("listed files: {:?}", &files); - // Pattern match check. - let pattern = &pattern; - if !pattern.is_empty() { - let regex = Regex::new(pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - pattern, e - )) - })?; - - files.retain(|file| regex.is_match(file)); + { + let pattern = &pattern; + if !pattern.is_empty() { + let regex = Regex::new(pattern).map_err(|e| { + ErrorCode::SyntaxException(format!( + "Pattern format invalid, got:{}, error:{:?}", + pattern, e + )) + })?; + stage_files.retain(|v| regex.is_match(&v.path)); + } + tracing::debug!("matched files: {:?}, pattern: {}", stage_files, pattern); } - tracing::debug!("matched files: {:?}, pattern: {}", &files, pattern); - let (table_id, copy_stage_files) = self - .filter_duplicate_files( + .filter_have_copied_files( *force, - table_info, catalog_name, database_name, table_name, - &files, + &stage_files, ) .await?;