Skip to content

Commit

Permalink
Merge pull request #8586 from BohuTANG/dev-stat-file-parallel
Browse files Browse the repository at this point in the history
feat: improve the COPY filting copied files performance
  • Loading branch information
BohuTANG authored Nov 1, 2022
2 parents 932fc4b + f71d1be commit 44b2779
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn stat_file(
let op = StageTable::get_op(&table_ctx, stage)?;
let meta = op.object(path).metadata().await?;
Ok(StageFile {
path: path.to_owned(),
path: path.to_string(),
size: meta.content_length(),
md5: meta.content_md5().map(str::to_string),
last_modified: meta
Expand Down
100 changes: 49 additions & 51 deletions src/query/service/src/interpreters/interpreter_copy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_exception::Result;
use common_meta_app::schema::GetTableCopiedFileReq;
use common_meta_app::schema::TableCopiedFileInfo;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_meta_types::StageFile;
use common_meta_types::UserStageInfo;
use common_planner::stage_table::StageTableInfo;
use common_planner::ReadDataSourcePlan;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl CopyInterpreterV2 {
Ok(CopyInterpreterV2 { ctx, plan })
}

async fn do_query_copied_files_info(
async fn get_copied_files_info(
&self,
catalog_name: String,
database_name: String,
Expand Down Expand Up @@ -98,14 +99,13 @@ impl CopyInterpreterV2 {
Ok(())
}

async fn filter_duplicate_files(
async fn filter_have_copied_files(
&self,
force: bool,
table_info: &StageTableInfo,
catalog_name: &str,
database_name: &str,
table_name: &str,
files: &[String],
stage_files: &[StageFile],
) -> Result<(u64, BTreeMap<String, TableCopiedFileInfo>)> {
let catalog = self.ctx.get_catalog(catalog_name)?;
let tenant = self.ctx.get_tenant();
Expand All @@ -118,39 +118,43 @@ impl CopyInterpreterV2 {

if !force {
// if force is false, copy only the files that unmatch to the meta copied files info.
let mut file_info = BTreeMap::new();

let files = stage_files
.iter()
.map(|v| v.path.clone())
.collect::<Vec<_>>();
let mut copied_files = BTreeMap::new();
for query_copied_files in files.chunks(MAX_QUERY_COPIED_FILES_NUM) {
self.do_query_copied_files_info(
self.get_copied_files_info(
catalog_name.to_string(),
database_name.to_string(),
table_id,
query_copied_files,
&mut file_info,
&mut copied_files,
)
.await?;
}

for file in files.iter() {
let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?;

if let Some(file_info) = file_info.get(file) {
match &file_info.etag {
for stage_file in stage_files {
if let Some(copied_file) = copied_files.get(&stage_file.path) {
match &copied_file.etag {
Some(_etag) => {
// No need to copy the file again if etag is_some and match.
if stage_file.etag == file_info.etag {
tracing::warn!("ignore copy file {:?} matched by etag", file);
if stage_file.etag == copied_file.etag {
tracing::warn!(
"ignore copy file {:?} matched by etag",
copied_file
);
continue;
}
}
None => {
// etag is none, compare with content_length and last_modified.
if file_info.content_length == stage_file.size
&& file_info.last_modified == Some(stage_file.last_modified)
if copied_file.content_length == stage_file.size
&& copied_file.last_modified == Some(stage_file.last_modified)
{
tracing::warn!(
"ignore copy file {:?} matched by content_length and last_modified",
file
copied_file
);
continue;
}
Expand All @@ -159,18 +163,16 @@ impl CopyInterpreterV2 {
}

// unmatch case: insert into file map for copy.
file_map.insert(file.clone(), TableCopiedFileInfo {
file_map.insert(stage_file.path.clone(), TableCopiedFileInfo {
etag: stage_file.etag.clone(),
content_length: stage_file.size,
last_modified: Some(stage_file.last_modified),
});
}
} else {
// if force is true, copy all the file.
for file in files.iter() {
let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?;

file_map.insert(file.clone(), TableCopiedFileInfo {
for stage_file in stage_files {
file_map.insert(stage_file.path.clone(), TableCopiedFileInfo {
etag: stage_file.etag.clone(),
content_length: stage_file.size,
last_modified: Some(stage_file.last_modified),
Expand Down Expand Up @@ -426,47 +428,43 @@ impl Interpreter for CopyInterpreterV2 {
SourceInfo::StageSource(table_info) => {
let path = &table_info.path;

// Here we add the path to the file: /path/to/path/file1.
let mut files = if !files.is_empty() {
let mut files_with_path = vec![];
let mut stage_files = if !files.is_empty() {
let mut res = vec![];

for file in files {
let new_path = Path::new(path).join(file);
files_with_path.push(new_path.to_string_lossy().to_string());
// Here we add the path to the file: /path/to/path/file1.
let new_path = Path::new(path).join(file).to_string_lossy().to_string();
let info =
stat_file(&self.ctx, &table_info.stage_info, &new_path).await?;
res.push(info);
}
files_with_path
res
} else {
let stage_files =
list_files(&self.ctx.clone(), &table_info.stage_info, path).await?;

// TODO(@xuanwo): Reuse existing metadata in StageFile.
stage_files.into_iter().map(|v| v.path).collect()
list_files(&self.ctx.clone(), &table_info.stage_info, path).await?
};

tracing::debug!("listed files: {:?}", &files);

// Pattern match check.
let pattern = &pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

files.retain(|file| regex.is_match(file));
{
let pattern = &pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;
stage_files.retain(|v| regex.is_match(&v.path));
}
tracing::debug!("matched files: {:?}, pattern: {}", stage_files, pattern);
}

tracing::debug!("matched files: {:?}, pattern: {}", &files, pattern);

let (table_id, copy_stage_files) = self
.filter_duplicate_files(
.filter_have_copied_files(
*force,
table_info,
catalog_name,
database_name,
table_name,
&files,
&stage_files,
)
.await?;

Expand Down

1 comment on commit 44b2779

@vercel
Copy link

@vercel vercel bot commented on 44b2779 Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.