Skip to content

Commit

Permalink
feat(query): return COPY INTO TABLE file status
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Sep 20, 2022
1 parent 985d6f8 commit f805174
Showing 1 changed file with 68 additions and 58 deletions.
126 changes: 68 additions & 58 deletions src/query/service/src/interpreters/interpreter_copy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use common_base::base::GlobalIORuntime;
use common_base::base::TrySpawn;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -69,15 +70,17 @@ impl CopyInterpreterV2 {
.get_table(&tenant, database_name, table_name)
.await?;
let table_id = table.get_id();
let req = GetTableCopiedFileReq {
table_id,
files: files.to_owned(),
};
let mut file_map = BTreeMap::new();

let mut file_map = BTreeMap::new();
if !force {
// if force is false, copy only the files that unmatch to the meta copied files info.
let resp = catalog.get_table_copied_file_info(req).await?;
let resp = catalog
.get_table_copied_file_info(GetTableCopiedFileReq {
table_id,
files: files.to_owned(),
})
.await?;

for file in files.iter() {
let stage_file = stat_file(&self.ctx, &table_info.stage_info, file).await?;

Expand Down Expand Up @@ -263,61 +266,68 @@ impl CopyInterpreterV2 {
from: &ReadDataSourcePlan,
files: Vec<String>,
) -> Result<PipelineBuildResult> {
let mut build_res = PipelineBuildResult::create();

let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files);
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);

let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
from_table.read2(
self.ctx.clone(),
&read_source_plan,
&mut build_res.main_pipeline,
)?;

let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?;

let ctx = self.ctx.clone();
let catalog_name = catalog_name.clone();
let files = files.clone();
let from = from.clone();

build_res.main_pipeline.set_on_finished(move |may_error| {
if may_error.is_none() {
// capture out variable
let ctx = ctx.clone();
let files = files.clone();
let from = from.clone();
let catalog_name = catalog_name.clone();
let to_table = to_table.clone();

let task = GlobalIORuntime::instance().spawn(async move {
// Commit
let operations = ctx.consume_precommit_blocks();
to_table
.commit_insertion(ctx.clone(), &catalog_name, operations, false)
.await?;

// Purge
CopyInterpreterV2::purge_files(ctx, &from, &files).await
});
if !files.is_empty() {
let mut build_res = PipelineBuildResult::create();
let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files);
tracing::info!("copy_files_to_table from source: {:?}", read_source_plan);

let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?;
from_table.read2(
self.ctx.clone(),
&read_source_plan,
&mut build_res.main_pipeline,
)?;

let to_table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;
to_table.append2(self.ctx.clone(), &mut build_res.main_pipeline, false)?;

let ctx = self.ctx.clone();
let catalog_name = catalog_name.clone();
let files = files.clone();
let from = from.clone();

build_res.main_pipeline.set_on_finished(move |may_error| {
if may_error.is_none() {
// capture out variable
let ctx = ctx.clone();
let files = files.clone();
let from = from.clone();
let catalog_name = catalog_name.clone();
let to_table = to_table.clone();

let task = GlobalIORuntime::instance().spawn(async move {
// Commit
let operations = ctx.consume_precommit_blocks();
to_table
.commit_insertion(ctx.clone(), &catalog_name, operations, false)
.await?;

return match futures::executor::block_on(task) {
Ok(Ok(_)) => Ok(()),
Ok(Err(error)) => Err(error),
Err(cause) => Err(ErrorCode::PanicError(format!(
"Maybe panic while in commit insert. {}",
cause
))),
};
}
// Purge
CopyInterpreterV2::purge_files(ctx, &from, &files).await
});

return match futures::executor::block_on(task) {
Ok(Ok(_)) => Ok(()),
Ok(Err(error)) => Err(error),
Err(cause) => Err(ErrorCode::PanicError(format!(
"Maybe panic while in commit insert. {}",
cause
))),
};
}

Err(may_error.as_ref().unwrap().clone())
});
Err(may_error.as_ref().unwrap().clone())
});
}

Ok(build_res)
// Copy status.
let schema = Arc::new(DataSchema::new(vec![DataField::new(
"File",
Vu8::to_data_type(),
)]));
PipelineBuildResult::from_blocks(vec![DataBlock::create(schema, vec![Series::from_data(
files,
)])])
}

async fn execute_copy_into_stage(
Expand Down

0 comments on commit f805174

Please sign in to comment.