From 817575733715467e033bdf6113aded5ebc68f92c Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Tue, 20 Sep 2022 12:50:52 +0800 Subject: [PATCH] feat(query): return COPY INTO TABLE file status --- .../src/interpreters/interpreter_copy_v2.rs | 152 ++++++++++-------- 1 file changed, 82 insertions(+), 70 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 811591719d42..bd5fc20591df 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_base::base::GlobalIORuntime; use common_base::base::TrySpawn; +use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; @@ -30,6 +31,7 @@ use common_meta_app::schema::UpsertTableCopiedFileReq; use common_meta_types::UserStageInfo; use futures::TryStreamExt; use regex::Regex; +use tracing::error; use super::append2table; use crate::interpreters::interpreter_common::stat_file; @@ -69,15 +71,17 @@ impl CopyInterpreterV2 { .get_table(&tenant, database_name, table_name) .await?; let table_id = table.get_id(); - let req = GetTableCopiedFileReq { - table_id, - files: files.to_owned(), - }; - let mut file_map = BTreeMap::new(); + let mut file_map = BTreeMap::new(); if !force { // if force is false, copy only the files that unmatch to the meta copied files info. - let resp = catalog.get_table_copied_file_info(req).await?; + let resp = catalog + .get_table_copied_file_info(GetTableCopiedFileReq { + table_id, + files: files.to_owned(), + }) + .await?; + for file in files.iter() { let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?; @@ -86,7 +90,11 @@ impl CopyInterpreterV2 { Some(_etag) => { // No need to copy the file again if etag is_some and match. if stage_file.etag == file_info.etag { - tracing::warn!("ignore copy file {:?} matched by etag", file); + tracing::warn!( + "ignore copy file {:?} matched by etag: {:?}", + file, + file_info.etag + ); continue; } } @@ -263,61 +271,68 @@ impl CopyInterpreterV2 { from: &ReadDataSourcePlan, files: Vec, ) -> Result { - let mut build_res = PipelineBuildResult::create(); - - let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files); - tracing::info!("copy_files_to_table from source: {:?}", read_source_plan); - - let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; - from_table.read2( - self.ctx.clone(), - &read_source_plan, - &mut build_res.main_pipeline, - )?; - - let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; - - to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?; - - let ctx = self.ctx.clone(); - let catalog_name = catalog_name.clone(); - let files = files.clone(); - let from = from.clone(); - - build_res.main_pipeline.set_on_finished(move |may_error| { - if may_error.is_none() { - // capture out variable - let ctx = ctx.clone(); - let files = files.clone(); - let from = from.clone(); - let catalog_name = catalog_name.clone(); - let to_table = to_table.clone(); - - let task = GlobalIORuntime::instance().spawn(async move { - // Commit - let operations = ctx.consume_precommit_blocks(); - to_table - .commit_insertion(ctx.clone(), &catalog_name, operations, false) - .await?; - - // Purge - CopyInterpreterV2::purge_files(ctx, &from, &files).await - }); + if !files.is_empty() { + let mut build_res = PipelineBuildResult::create(); + let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files); + tracing::info!("copy_files_to_table from source: {:?}", read_source_plan); + + let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; + from_table.read2( + self.ctx.clone(), + &read_source_plan, + &mut build_res.main_pipeline, + )?; + + let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; + to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?; + + let ctx = self.ctx.clone(); + let catalog_name = catalog_name.clone(); + let files = files.clone(); + let from = from.clone(); + + build_res.main_pipeline.set_on_finished(move |may_error| { + if may_error.is_none() { + // capture out variable + let ctx = ctx.clone(); + let files = files.clone(); + let from = from.clone(); + let catalog_name = catalog_name.clone(); + let to_table = to_table.clone(); + + let task = GlobalIORuntime::instance().spawn(async move { + // Commit + let operations = ctx.consume_precommit_blocks(); + to_table + .commit_insertion(ctx.clone(), &catalog_name, operations, false) + .await?; - return match futures::executor::block_on(task) { - Ok(Ok(_)) => Ok(()), - Ok(Err(error)) => Err(error), - Err(cause) => Err(ErrorCode::PanicError(format!( - "Maybe panic while in commit insert. {}", - cause - ))), - }; - } + // Purge + CopyInterpreterV2::purge_files(ctx, &from, &files).await + }); + + return match futures::executor::block_on(task) { + Ok(Ok(_)) => Ok(()), + Ok(Err(error)) => Err(error), + Err(cause) => Err(ErrorCode::PanicError(format!( + "Maybe panic while in commit insert. {}", + cause + ))), + }; + } - Err(may_error.as_ref().unwrap().clone()) - }); + Err(may_error.as_ref().unwrap().clone()) + }); + } - Ok(build_res) + // Copy status. + let schema = Arc::new(DataSchema::new(vec![DataField::new( + "File", + Vu8::to_data_type(), + )])); + PipelineBuildResult::from_blocks(vec![DataBlock::create(schema, vec![Series::from_data( + files, + )])]) } async fn execute_copy_into_stage( @@ -433,14 +448,7 @@ impl Interpreter for CopyInterpreterV2 { ) .await?; - tracing::info!( - "matched copy unduplicate files: {:?}", - ©_stage_files.keys(), - ); - - if copy_stage_files.is_empty() { - return Ok(PipelineBuildResult::create()); - } + tracing::info!("need copy files: {:?}", ©_stage_files.keys(),); let result = self .copy_files_to_table( @@ -453,13 +461,17 @@ impl Interpreter for CopyInterpreterV2 { .await; if result.is_ok() { - let _ = self - .upsert_copied_files_info(catalog_name, table_id, copy_stage_files) - .await?; + self.upsert_copied_files_info(catalog_name, table_id, copy_stage_files) + .await + .map_err(|e| { + error!("upsert.copied.files.info error:{}", e); + e + })?; } result } + _other => { return self .copy_files_to_table(