diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 811591719d42..6a84ad45cbed 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_base::base::GlobalIORuntime; use common_base::base::TrySpawn; +use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; @@ -30,6 +31,9 @@ use common_meta_app::schema::UpsertTableCopiedFileReq; use common_meta_types::UserStageInfo; use futures::TryStreamExt; use regex::Regex; +use tracing::error; +use tracing::info; +use tracing::warn; use super::append2table; use crate::interpreters::interpreter_common::stat_file; @@ -69,15 +73,17 @@ impl CopyInterpreterV2 { .get_table(&tenant, database_name, table_name) .await?; let table_id = table.get_id(); - let req = GetTableCopiedFileReq { - table_id, - files: files.to_owned(), - }; - let mut file_map = BTreeMap::new(); + let mut file_map = BTreeMap::new(); if !force { // if force is false, copy only the files that unmatch to the meta copied files info. - let resp = catalog.get_table_copied_file_info(req).await?; + let resp = catalog + .get_table_copied_file_info(GetTableCopiedFileReq { + table_id, + files: files.to_owned(), + }) + .await?; + for file in files.iter() { let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?; @@ -86,7 +92,10 @@ impl CopyInterpreterV2 { Some(_etag) => { // No need to copy the file again if etag is_some and match. if stage_file.etag == file_info.etag { - tracing::warn!("ignore copy file {:?} matched by etag", file); + warn!( + "ignore copy file {:?} matched by etag: {:?}", + file, file_info.etag + ); continue; } } @@ -95,7 +104,7 @@ impl CopyInterpreterV2 { if file_info.content_length == stage_file.size && file_info.last_modified == Some(stage_file.last_modified) { - tracing::warn!( + warn!( "ignore copy file {:?} matched by content_length and last_modified", file ); @@ -133,7 +142,7 @@ impl CopyInterpreterV2 { table_id: u64, copy_stage_files: BTreeMap, ) -> Result<()> { - tracing::info!("upsert_copied_files_info: {:?}", copy_stage_files); + info!("upsert_copied_files_info: {:?}", copy_stage_files); if !copy_stage_files.is_empty() { let req = UpsertTableCopiedFileReq { @@ -199,7 +208,7 @@ impl CopyInterpreterV2 { list.into_iter().collect::>() }; - tracing::info!("listed files: {:?}", &files_with_path); + info!("listed files: {:?}", &files_with_path); Ok(files_with_path) } @@ -222,10 +231,10 @@ impl CopyInterpreterV2 { let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?; for f in files { if let Err(e) = op.object(f).delete().await { - tracing::error!("Failed to delete file: {}, error: {}", f, e); + error!("Failed to delete file: {}, error: {}", f, e); } } - tracing::info!("purge files: {:?}", files); + info!("purge files: {:?}", files); } Ok(()) } @@ -263,61 +272,68 @@ impl CopyInterpreterV2 { from: &ReadDataSourcePlan, files: Vec, ) -> Result { - let mut build_res = PipelineBuildResult::create(); - - let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files); - tracing::info!("copy_files_to_table from source: {:?}", read_source_plan); - - let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; - from_table.read2( - self.ctx.clone(), - &read_source_plan, - &mut build_res.main_pipeline, - )?; - - let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; - - to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?; - - let ctx = self.ctx.clone(); - let catalog_name = catalog_name.clone(); - let files = files.clone(); - let from = from.clone(); - - build_res.main_pipeline.set_on_finished(move |may_error| { - if may_error.is_none() { - // capture out variable - let ctx = ctx.clone(); - let files = files.clone(); - let from = from.clone(); - let catalog_name = catalog_name.clone(); - let to_table = to_table.clone(); - - let task = GlobalIORuntime::instance().spawn(async move { - // Commit - let operations = ctx.consume_precommit_blocks(); - to_table - .commit_insertion(ctx.clone(), &catalog_name, operations, false) - .await?; - - // Purge - CopyInterpreterV2::purge_files(ctx, &from, &files).await - }); + if !files.is_empty() { + let mut build_res = PipelineBuildResult::create(); + let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files); + info!("copy_files_to_table from source: {:?}", read_source_plan); + + let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; + from_table.read2( + self.ctx.clone(), + &read_source_plan, + &mut build_res.main_pipeline, + )?; + + let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?; + to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?; + + let ctx = self.ctx.clone(); + let catalog_name = catalog_name.clone(); + let files = files.clone(); + let from = from.clone(); + + build_res.main_pipeline.set_on_finished(move |may_error| { + if may_error.is_none() { + // capture out variable + let ctx = ctx.clone(); + let files = files.clone(); + let from = from.clone(); + let catalog_name = catalog_name.clone(); + let to_table = to_table.clone(); + + let task = GlobalIORuntime::instance().spawn(async move { + // Commit + let operations = ctx.consume_precommit_blocks(); + to_table + .commit_insertion(ctx.clone(), &catalog_name, operations, false) + .await?; - return match futures::executor::block_on(task) { - Ok(Ok(_)) => Ok(()), - Ok(Err(error)) => Err(error), - Err(cause) => Err(ErrorCode::PanicError(format!( - "Maybe panic while in commit insert. {}", - cause - ))), - }; - } + // Purge + CopyInterpreterV2::purge_files(ctx, &from, &files).await + }); + + return match futures::executor::block_on(task) { + Ok(Ok(_)) => Ok(()), + Ok(Err(error)) => Err(error), + Err(cause) => Err(ErrorCode::PanicError(format!( + "Maybe panic while in commit insert. {}", + cause + ))), + }; + } - Err(may_error.as_ref().unwrap().clone()) - }); + Err(may_error.as_ref().unwrap().clone()) + }); + } - Ok(build_res) + // Copy status. + let schema = Arc::new(DataSchema::new(vec![DataField::new( + "File", + Vu8::to_data_type(), + )])); + PipelineBuildResult::from_blocks(vec![DataBlock::create(schema, vec![Series::from_data( + files, + )])]) } async fn execute_copy_into_stage( @@ -418,7 +434,7 @@ impl Interpreter for CopyInterpreterV2 { files = matched_files; } - tracing::info!("matched files: {:?}, pattern: {}", &files, pattern); + info!("Copy matched files: {:?}, pattern: {}", &files, pattern); match &from.source_info { SourceInfo::StageSource(table_info) => { @@ -433,14 +449,7 @@ impl Interpreter for CopyInterpreterV2 { ) .await?; - tracing::info!( - "matched copy unduplicate files: {:?}", - ©_stage_files.keys(), - ); - - if copy_stage_files.is_empty() { - return Ok(PipelineBuildResult::create()); - } + info!("Copy files list: {:?}", ©_stage_files.keys(),); let result = self .copy_files_to_table( @@ -453,13 +462,17 @@ impl Interpreter for CopyInterpreterV2 { .await; if result.is_ok() { - let _ = self - .upsert_copied_files_info(catalog_name, table_id, copy_stage_files) - .await?; + self.upsert_copied_files_info(catalog_name, table_id, copy_stage_files) + .await + .map_err(|e| { + error!("Upsert copied_files_info error:{}", e); + e + })?; } result } + _other => { return self .copy_files_to_table( diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index af52128edeaf..9259e3dd5fe4 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -65,6 +65,7 @@ fn has_result_set_by_plan(plan: &Plan) -> bool { Plan::Query { .. } | Plan::Explain { .. } | Plan::Call(_) + | Plan::Copy(_) | Plan::ShowCreateDatabase(_) | Plan::ShowCreateTable(_) | Plan::DescShare(_) diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index 44277d6e2036..1fe871004cdf 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -31,6 +31,7 @@ use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; use futures_util::StreamExt; use opensrv_mysql::*; +use tracing::debug; use tracing::error; /// Reports progress information as string, intend to be put into the mysql Ok packet. @@ -185,6 +186,9 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { } Ok(block) => block, }; + + debug!("mysql.query.result.writer.block: {:?}", block); + match block.get_serializers() { Ok(serializers) => { let rows_size = block.column(0).len();