Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(query): copy into stage support #5579

Merged
merged 7 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions common/io/src/buffer/buffer_read_datetime_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ where R: BufferRead
let mut buf = vec![0; DATE_LEN];
self.read_exact(buf.as_mut_slice())?;

let v = std::str::from_utf8(buf.as_slice())
.map_err_to_code(ErrorCode::BadBytes, || "Cannot convert value to utf8")?;
let v = std::str::from_utf8(buf.as_slice()).map_err_to_code(ErrorCode::BadBytes, || {
format!("Cannot convert value:{:?} to utf8", buf)
})?;
v.parse::<NaiveDate>()
.map_err_to_code(ErrorCode::BadBytes, || "Cannot parse value to Date type")
.map_err_to_code(ErrorCode::BadBytes, || {
format!("Cannot parse value:{} to Date type", v)
})
}

fn read_timestamp_text(&mut self, tz: &Tz) -> Result<DateTime<Tz>> {
Expand Down
1 change: 1 addition & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub use plan_aggregator_final::AggregatorFinalPlan;
pub use plan_aggregator_partial::AggregatorPartialPlan;
pub use plan_broadcast::BroadcastPlan;
pub use plan_call::CallPlan;
pub use plan_copy::CopyMode;
pub use plan_copy::CopyPlan;
pub use plan_copy::ValidationMode;
pub use plan_database_create::CreateDatabasePlan;
Expand Down
75 changes: 58 additions & 17 deletions common/planners/src/plan_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::str::FromStr;
use common_datavalues::DataSchemaRef;
use common_meta_types::MetaId;

use crate::PlanNode;
use crate::ReadDataSourcePlan;
use crate::StageTableInfo;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone, Debug)]
pub enum ValidationMode {
Expand Down Expand Up @@ -54,34 +56,73 @@ impl FromStr for ValidationMode {

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)]
pub struct CopyPlan {
pub catalog_name: String,
pub db_name: String,
pub tbl_name: String,
pub tbl_id: MetaId,
pub schema: DataSchemaRef,
pub from: ReadDataSourcePlan,
pub copy_mode: CopyMode,

pub validation_mode: ValidationMode,
pub files: Vec<String>,
pub pattern: String,
}

/// CopyPlan supports CopyIntoTable & CopyIntoStage
#[allow(clippy::large_enum_variant)]
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)]
pub enum CopyMode {
IntoTable {
catalog_name: String,
db_name: String,
tbl_name: String,
tbl_id: MetaId,
files: Vec<String>,
pattern: String,
schema: DataSchemaRef,
from: ReadDataSourcePlan,
},

IntoStage {
stage_table_info: StageTableInfo,
query: Box<PlanNode>,
},
}

impl CopyPlan {
pub fn schema(&self) -> DataSchemaRef {
self.schema.clone()
match &self.copy_mode {
CopyMode::IntoTable { schema, .. } => schema.clone(),
CopyMode::IntoStage {
stage_table_info, ..
} => stage_table_info.schema.clone(),
}
}
}

impl Debug for CopyPlan {
// Ignore the schema.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Copy into {:}.{:}", self.db_name, self.tbl_name)?;
write!(f, ", {:?}", self.from)?;
if !self.files.is_empty() {
write!(f, " ,files:{:?}", self.files)?;
}
if !self.pattern.is_empty() {
write!(f, " ,pattern:{:?}", self.pattern)?;
match &self.copy_mode {
CopyMode::IntoTable {
db_name,
tbl_name,
files,
pattern,
from,
..
} => {
write!(f, "Copy into {:}.{:}", db_name, tbl_name)?;
write!(f, ", {:?}", from)?;
if !files.is_empty() {
write!(f, " ,files:{:?}", files)?;
}
if !pattern.is_empty() {
write!(f, " ,pattern:{:?}", pattern)?;
}
write!(f, " ,validation_mode:{:?}", self.validation_mode)?;
}
CopyMode::IntoStage {
stage_table_info,
query,
} => {
write!(f, "Copy into {:?}", stage_table_info)?;
write!(f, ", query: {:?})", query)?;
}
}
write!(f, " ,validation_mode:{:?}", self.validation_mode)
Ok(())
}
}
161 changes: 105 additions & 56 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::operator_list_files;
use common_planners::CopyMode;
use common_planners::CopyPlan;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
use common_planners::SelectPlan;
use common_planners::SourceInfo;
use common_planners::StageTableInfo;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::TryStreamExt;
use regex::Regex;

use super::SelectInterpreter;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand All @@ -36,6 +41,7 @@ use crate::pipelines::new::executor::PipelinePullingExecutor;
use crate::pipelines::new::NewPipeline;
use crate::sessions::QueryContext;
use crate::storages::stage::StageSource;
use crate::storages::stage::StageTable;

pub struct CopyInterpreter {
ctx: Arc<QueryContext>,
Expand All @@ -53,14 +59,18 @@ impl CopyInterpreter {
// 2. If the plan.files is empty, there are also two case:
// 2.1 If the path is a file like /path/to/path/file, S3File::list() will return the same file path.
// 2.2 If the path is a folder, S3File::list() will return all the files in it.
async fn list_files(&self) -> Result<Vec<String>> {
let files = match &self.plan.from.source_info {
async fn list_files(
&self,
from: &ReadDataSourcePlan,
files: &Vec<String>,
) -> Result<Vec<String>> {
let files = match &from.source_info {
SourceInfo::StageSource(table_info) => {
let path = &table_info.path;
// Here we add the path to the file: /path/to/path/file1.
let files_with_path = if !self.plan.files.is_empty() {
let files_with_path = if !files.is_empty() {
let mut files_with_path = vec![];
for file in &self.plan.files {
for file in files {
let new_path = Path::new(path).join(file);
files_with_path.push(new_path.to_string_lossy().to_string());
}
Expand Down Expand Up @@ -100,12 +110,19 @@ impl CopyInterpreter {
// Note:
// We parse the `s3://` to ReadSourcePlan instead of to a SELECT plan is that:
#[tracing::instrument(level = "debug", name = "copy_files_to_table", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn copy_files_to_table(&self, files: Vec<String>) -> Result<Vec<DataBlock>> {
async fn copy_files_to_table(
&self,
catalog_name: &String,
db_name: &String,
tbl_name: &String,
from: &ReadDataSourcePlan,
files: Vec<String>,
) -> Result<Vec<DataBlock>> {
let ctx = self.ctx.clone();
let settings = self.ctx.get_settings();

let mut pipeline = NewPipeline::create();
let read_source_plan = self.plan.from.clone();
let read_source_plan = from.clone();
let read_source_plan = Self::rewrite_read_plan_file_name(read_source_plan, files);
tracing::info!("copy_files_to_table: source plan:{:?}", read_source_plan);
let table = ctx.build_table_from_source_plan(&read_source_plan)?;
Expand All @@ -114,13 +131,7 @@ impl CopyInterpreter {
return Err(e);
}

let table = ctx
.get_table(
&self.plan.catalog_name,
&self.plan.db_name,
&self.plan.tbl_name,
)
.await?;
let table = ctx.get_table(catalog_name, db_name, tbl_name).await?;

if ctx.get_settings().get_enable_new_processor_framework()? != 0
&& self.ctx.get_cluster().is_empty()
Expand Down Expand Up @@ -149,58 +160,26 @@ impl CopyInterpreter {

Ok(operations)
}
}

#[async_trait::async_trait]
impl Interpreter for CopyInterpreter {
fn name(&self) -> &str {
"CopyInterpreter"
}

#[tracing::instrument(level = "debug", name = "copy_interpreter_execute", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute(
async fn execute_copy_into_stage(
&self,
mut _input_stream: Option<SendableDataBlockStream>,
stage_table_info: &StageTableInfo,
query: &PlanNode,
) -> Result<SendableDataBlockStream> {
let mut files = self.list_files().await?;

// Pattern match check.
let pattern = &self.plan.pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

let matched_files = files
.iter()
.filter(|file| regex.is_match(file))
.cloned()
.collect();
files = matched_files;
}
let table = StageTable::try_create(stage_table_info.clone())?;

tracing::info!("copy file list:{:?}, pattern:{}", &files, pattern,);
let select_interpreter = SelectInterpreter::try_create(self.ctx.clone(), SelectPlan {
input: Arc::new(query.clone()),
})?;

let write_results = self.copy_files_to_table(files).await?;
let stream = select_interpreter.execute(None).await?;
let results = table.append_data(self.ctx.clone(), stream).await?;

let table = self
.ctx
.get_table(
&self.plan.catalog_name,
&self.plan.db_name,
&self.plan.tbl_name,
)
.await?;

// Commit.
table
.commit_insertion(
self.ctx.clone(),
&self.plan.catalog_name,
write_results,
&self.ctx.get_current_catalog(),
results.try_collect().await?,
false,
)
.await?;
Expand All @@ -212,3 +191,73 @@ impl Interpreter for CopyInterpreter {
)))
}
}

#[async_trait::async_trait]
impl Interpreter for CopyInterpreter {
fn name(&self) -> &str {
"CopyInterpreter"
}

#[tracing::instrument(level = "debug", name = "copy_interpreter_execute", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute(
&self,
mut _input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
match &self.plan.copy_mode {
CopyMode::IntoTable {
catalog_name,
db_name,
tbl_name,
files,
pattern,
from,
..
} => {
let mut files = self.list_files(from, files).await?;
// Pattern match check.
let pattern = &pattern;
if !pattern.is_empty() {
let regex = Regex::new(pattern).map_err(|e| {
ErrorCode::SyntaxException(format!(
"Pattern format invalid, got:{}, error:{:?}",
pattern, e
))
})?;

let matched_files = files
.iter()
.filter(|file| regex.is_match(file))
.cloned()
.collect();
files = matched_files;
}

tracing::info!("copy file list:{:?}, pattern:{}", &files, pattern,);

let write_results = self
.copy_files_to_table(catalog_name, db_name, tbl_name, from, files)
.await?;

let table = self.ctx.get_table(catalog_name, db_name, tbl_name).await?;

// Commit.
table
.commit_insertion(self.ctx.clone(), catalog_name, write_results, false)
.await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
vec![],
)))
}
CopyMode::IntoStage {
stage_table_info,
query,
} => {
self.execute_copy_into_stage(stage_table_info, query.as_ref())
.await
}
}
}
}
Loading