Skip to content

Commit

Permalink
reuse the StageFile for filter copied files
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Nov 1, 2022
1 parent 7ac62eb commit a6371c8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 102 deletions.
57 changes: 7 additions & 50 deletions src/query/service/src/interpreters/interpreter_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ use std::sync::Arc;

use chrono::TimeZone;
use chrono::Utc;
use common_base::base::tokio::sync::Semaphore;
use common_base::base::GlobalIORuntime;
use common_base::base::Runtime;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::GrantObject;
use common_meta_types::StageFile;
use common_meta_types::UserStageInfo;
use common_pipeline_core::Pipeline;
use futures::TryStreamExt;
use futures_util::future;
use tracing::debug;
use tracing::warn;
use tracing::Instrument;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
Expand Down Expand Up @@ -149,16 +144,16 @@ pub async fn validate_grant_object_exists(
Ok(())
}

async fn stat_file(
ctx: Arc<QueryContext>,
stage: UserStageInfo,
path: String,
pub async fn stat_file(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
path: &str,
) -> Result<StageFile> {
let table_ctx: Arc<dyn TableContext> = ctx.clone();
let op = StageTable::get_op(&table_ctx, &stage)?;
let meta = op.object(&path).metadata().await?;
let op = StageTable::get_op(&table_ctx, stage)?;
let meta = op.object(path).metadata().await?;
Ok(StageFile {
path,
path: path.to_string(),
size: meta.content_length(),
md5: meta.content_md5().map(str::to_string),
last_modified: meta
Expand All @@ -169,44 +164,6 @@ async fn stat_file(
})
}

// Stat files in parallel.
pub async fn stat_files(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
files: impl IntoIterator<Item = impl AsRef<str>>,
) -> Result<Vec<Result<StageFile>>> {
// 1.1 combine all the tasks.
let mut iter = files.into_iter();
let tasks = std::iter::from_fn(move || {
let ctx = ctx.clone();
if let Some(location) = iter.next() {
let location = location.as_ref().to_owned();
Some(
stat_file(ctx, stage.clone(), location)
.instrument(tracing::debug_span!("stat_file")),
)
} else {
None
}
});

// 1.2 build the runtime.
let max_runtime_threads = ctx.get_settings().get_max_threads()? as usize;
let semaphore = Semaphore::new(max_runtime_threads);
let stat_runtime = Arc::new(Runtime::with_worker_threads(
max_runtime_threads,
Some("stat-files-worker".to_owned()),
)?);

// 1.3 spawn all the tasks to the runtime.
let join_handlers = stat_runtime.try_spawn_batch(semaphore, tasks).await?;

// 1.4 get all the result.
future::try_join_all(join_handlers)
.await
.map_err(|e| ErrorCode::LogicalError(format!("Stat files in parallel failure, {}", e)))
}

/// List files from DAL in recursive way.
///
/// - If input path is a dir, we will list it recursively.
Expand Down
102 changes: 50 additions & 52 deletions src/query/service/src/interpreters/interpreter_copy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_exception::Result;
use common_meta_app::schema::GetTableCopiedFileReq;
use common_meta_app::schema::TableCopiedFileInfo;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_meta_types::StageFile;
use common_meta_types::UserStageInfo;
use common_planner::stage_table::StageTableInfo;
use common_planner::ReadDataSourcePlan;
Expand All @@ -32,7 +33,7 @@ use regex::Regex;
use super::append2table;
use crate::catalogs::Catalog;
use crate::interpreters::interpreter_common::list_files;
use crate::interpreters::interpreter_common::stat_files;
use crate::interpreters::interpreter_common::stat_file;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreterV2;
use crate::pipelines::PipelineBuildResult;
Expand All @@ -42,7 +43,7 @@ use crate::sql::plans::CopyPlanV2;
use crate::sql::plans::Plan;
use crate::storages::stage::StageTable;

const MAX_QUERY_COPIED_FILES_NUM: usize = 50;
const MAX_QUERY_COPIED_FILES_NUM: usize = 100;
const TABLE_COPIED_FILE_KEY_EXPIRE_AFTER_DAYS: Option<u64> = Some(7);
pub struct CopyInterpreterV2 {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -98,14 +99,13 @@ impl CopyInterpreterV2 {
Ok(())
}

async fn filter_duplicate_files(
async fn filter_have_copied_files(
&self,
force: bool,
table_info: &StageTableInfo,
catalog_name: &str,
database_name: &str,
table_name: &str,
files: &[String],
stage_file_infos: &[StageFile],
) -> Result<(u64, BTreeMap<String, TableCopiedFileInfo>)> {
let catalog = self.ctx.get_catalog(catalog_name)?;
let tenant = self.ctx.get_tenant();
Expand All @@ -118,26 +118,28 @@ impl CopyInterpreterV2 {

if !force {
// if force is false, copy only the files that unmatch to the meta copied files info.
let mut file_info = BTreeMap::new();

let files = stage_file_infos
.iter()
.map(|v| v.path.clone())
.collect::<Vec<_>>();
let mut copied_file_infos = BTreeMap::new();
for query_copied_files in files.chunks(MAX_QUERY_COPIED_FILES_NUM) {
self.get_copied_files_info(
catalog_name.to_string(),
database_name.to_string(),
table_id,
query_copied_files,
&mut file_info,
&mut copied_file_infos,
)
.await?;
}

let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?;
for stat_info in stat_infos.into_iter().flatten() {
if let Some(copied_file_info) = file_info.get(&stat_info.path) {
for stage_file_info in stage_file_infos {
if let Some(copied_file_info) = copied_file_infos.get(&stage_file_info.path) {
match &copied_file_info.etag {
Some(_etag) => {
// No need to copy the file again if etag is_some and match.
if stat_info.etag == copied_file_info.etag {
if stage_file_info.etag == copied_file_info.etag {
tracing::warn!(
"ignore copy file {:?} matched by etag",
copied_file_info
Expand All @@ -147,8 +149,9 @@ impl CopyInterpreterV2 {
}
None => {
// etag is none, compare with content_length and last_modified.
if copied_file_info.content_length == stat_info.size
&& copied_file_info.last_modified == Some(stat_info.last_modified)
if copied_file_info.content_length == stage_file_info.size
&& copied_file_info.last_modified
== Some(stage_file_info.last_modified)
{
tracing::warn!(
"ignore copy file {:?} matched by content_length and last_modified",
Expand All @@ -161,20 +164,19 @@ impl CopyInterpreterV2 {
}

// unmatch case: insert into file map for copy.
file_map.insert(stat_info.path.clone(), TableCopiedFileInfo {
etag: stat_info.etag.clone(),
content_length: stat_info.size,
last_modified: Some(stat_info.last_modified),
file_map.insert(stage_file_info.path.clone(), TableCopiedFileInfo {
etag: stage_file_info.etag.clone(),
content_length: stage_file_info.size,
last_modified: Some(stage_file_info.last_modified),
});
}
} else {
// if force is true, copy all the file.
let stat_infos = stat_files(&self.ctx, &table_info.stage_info, files).await?;
for stat_info in stat_infos.into_iter().flatten() {
file_map.insert(stat_info.path, TableCopiedFileInfo {
etag: stat_info.etag.clone(),
content_length: stat_info.size,
last_modified: Some(stat_info.last_modified),
for stage_file_info in stage_file_infos {
file_map.insert(stage_file_info.path.clone(), TableCopiedFileInfo {
etag: stage_file_info.etag.clone(),
content_length: stage_file_info.size,
last_modified: Some(stage_file_info.last_modified),
});
}
}
Expand Down Expand Up @@ -427,47 +429,43 @@ impl Interpreter for CopyInterpreterV2 {
SourceInfo::StageSource(table_info) => {
let path = &table_info.path;

// Here we add the path to the file: /path/to/path/file1.
let mut files = if !files.is_empty() {
let mut files_with_path = vec![];
let mut stage_infos = if !files.is_empty() {
let mut res = vec![];

for file in files {
let new_path = Path::new(path).join(file);
files_with_path.push(new_path.to_string_lossy().to_string());
// Here we add the path to the file: /path/to/path/file1.
let new_path = Path::new(path).join(file).to_string_lossy().to_string();
let info =
stat_file(&self.ctx, &table_info.stage_info, &new_path).await?;
res.push(info);
}
files_with_path
res
} else {
let stage_files =
list_files(&self.ctx.clone(), &table_info.stage_info, path).await?;

// TODO(@xuanwo): Reuse existing metadata in StageFile.
stage_files.into_iter().map(|v| v.path).collect()
list_files(&self.ctx.clone(), &table_info.stage_info, path).await?
};

tracing::debug!("listed files: {:?}", &files);

// Pattern match check.
let pattern = &pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

files.retain(|file| regex.is_match(file));
{
let pattern = &pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;
stage_infos.retain(|v| regex.is_match(&v.path));
}
tracing::debug!("matched files: {:?}, pattern: {}", stage_infos, pattern);
}

tracing::debug!("matched files: {:?}, pattern: {}", &files, pattern);

let (table_id, copy_stage_files) = self
.filter_duplicate_files(
.filter_have_copied_files(
*force,
table_info,
catalog_name,
database_name,
table_name,
&files,
&stage_infos,
)
.await?;

Expand Down

0 comments on commit a6371c8

Please sign in to comment.