From cc4d3f25250405062ee21e9c83620af7f4b0d610 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 29 Nov 2024 12:50:35 +0800 Subject: [PATCH] feat: copy support option COLUMN_MATCH_MODE (#16963) * rm unused code. * remove dup code * feat: parquet support copy option `COLUMN_MATCH_MODE` --- Cargo.lock | 1 + src/meta/app/src/principal/file_format.rs | 22 ++++ src/meta/app/src/principal/user_stage.rs | 90 +-------------- src/query/ast/src/ast/statements/copy.rs | 108 ++++++++++++------ src/query/ast/src/ast/statements/stage.rs | 8 ++ src/query/ast/src/parser/copy.rs | 4 + src/query/ast/src/parser/stage.rs | 4 + src/query/ast/src/parser/token.rs | 4 + .../ast/tests/it/testdata/stmt-error.txt | 6 +- src/query/ast/tests/it/testdata/stmt.txt | 60 +++++++--- src/query/catalog/src/table_context.rs | 1 + src/query/service/src/sessions/query_ctx.rs | 2 + .../service/src/sessions/query_ctx_shared.rs | 2 +- .../bind_table_reference/bind_location.rs | 12 +- .../sql/src/planner/binder/copy_into_table.rs | 34 +++++- src/query/sql/src/planner/binder/table.rs | 9 +- .../stage/src/read/columnar/projection.rs | 30 ++++- .../orc/src/copy_into_table/projection.rs | 1 + src/query/storages/parquet/Cargo.toml | 1 + .../src/parquet_rs/copy_into_table/reader.rs | 4 +- .../src/parquet_rs/copy_into_table/table.rs | 4 + .../src/parquet_rs/parquet_table/table.rs | 12 +- .../storages/parquet/src/parquet_rs/schema.rs | 13 ++- .../stage/options/case_sensitive_parquet.test | 95 +++++++++++++++ 24 files changed, 359 insertions(+), 168 deletions(-) create mode 100644 tests/sqllogictests/suites/stage/options/case_sensitive_parquet.test diff --git a/Cargo.lock b/Cargo.lock index ae886fc866be..8f190d2e7627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4378,6 +4378,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "databend-common-ast", "databend-common-base", "databend-common-catalog", "databend-common-exception", diff --git a/src/meta/app/src/principal/file_format.rs b/src/meta/app/src/principal/file_format.rs index a97df1e4ea9f..445b802c8138 100644 --- a/src/meta/app/src/principal/file_format.rs +++ b/src/meta/app/src/principal/file_format.rs @@ -18,6 +18,8 @@ use std::fmt::Display; use std::fmt::Formatter; use std::str::FromStr; +use databend_common_ast::ast::ColumnMatchMode; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_ast::ast::FileFormatOptions; use databend_common_ast::ast::FileFormatValue; use databend_common_exception::ErrorCode; @@ -107,6 +109,26 @@ impl FileFormatParams { } } + pub fn check_copy_options(&self, options: &mut CopyIntoTableOptions) -> Result<()> { + if let Some(m) = &options.column_match_mode { + match self { + FileFormatParams::Parquet(_) => { + if let ColumnMatchMode::Position = m { + return Err(ErrorCode::BadArguments( + "COLUMN_MATCH_MODE=POSITION not supported yet.", + )); + } + } + _ => { + return Err(ErrorCode::BadArguments( + "COLUMN_MATCH_MODE can only apply to Parquet for now.", + )); + } + } + } + Ok(()) + } + pub fn need_field_default(&self) -> bool { match self { FileFormatParams::Parquet(v) => v.missing_field_as == NullAs::FieldDefault, diff --git a/src/meta/app/src/principal/user_stage.rs b/src/meta/app/src/principal/user_stage.rs index ddeeca304de7..f0673a92d1e7 100644 --- a/src/meta/app/src/principal/user_stage.rs +++ b/src/meta/app/src/principal/user_stage.rs @@ -20,6 +20,7 @@ use std::str::FromStr; use chrono::DateTime; use chrono::Utc; +pub use databend_common_ast::ast::OnErrorMode; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_io::constants::NAN_BYTES_SNAKE; @@ -400,95 +401,6 @@ pub struct StageParams { pub storage: StorageParams, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)] -pub enum OnErrorMode { - Continue, - SkipFileNum(u64), - AbortNum(u64), -} - -impl Default for OnErrorMode { - fn default() -> Self { - Self::AbortNum(1) - } -} - -impl FromStr for OnErrorMode { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - match s.to_uppercase().as_str() { - "" | "ABORT" => Ok(OnErrorMode::AbortNum(1)), - "CONTINUE" => Ok(OnErrorMode::Continue), - "SKIP_FILE" => Ok(OnErrorMode::SkipFileNum(1)), - v => { - if v.starts_with("ABORT_") { - let num_str = v.replace("ABORT_", ""); - let nums = num_str.parse::(); - match nums { - Ok(n) if n < 1 => { - Err("OnError mode `ABORT_` num must be greater than 0".to_string()) - } - Ok(n) => Ok(OnErrorMode::AbortNum(n)), - Err(_) => Err(format!( - "Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_ | ABORT | ABORT_ }}", - v - )), - } - } else { - let num_str = v.replace("SKIP_FILE_", ""); - let nums = num_str.parse::(); - match nums { - Ok(n) if n < 1 => { - Err("OnError mode `SKIP_FILE_` num must be greater than 0" - .to_string()) - } - Ok(n) => Ok(OnErrorMode::SkipFileNum(n)), - Err(_) => Err(format!( - "Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_ | ABORT | ABORT_ }}", - v - )), - } - } - } - } - } -} - -impl Display for OnErrorMode { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - OnErrorMode::Continue => { - write!(f, "continue") - } - OnErrorMode::SkipFileNum(n) => { - if *n <= 1 { - write!(f, "skipfile") - } else { - write!(f, "skipfile_{}", n) - } - } - OnErrorMode::AbortNum(n) => { - if *n <= 1 { - write!(f, "abort") - } else { - write!(f, "abort_{}", n) - } - } - } - } -} - -impl From for OnErrorMode { - fn from(opt: databend_common_ast::ast::OnErrorMode) -> Self { - match opt { - databend_common_ast::ast::OnErrorMode::Continue => OnErrorMode::Continue, - databend_common_ast::ast::OnErrorMode::SkipFileNum(n) => OnErrorMode::SkipFileNum(n), - databend_common_ast::ast::OnErrorMode::AbortNum(n) => OnErrorMode::AbortNum(n), - } - } -} - #[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)] #[serde(default)] pub struct CopyOptions { diff --git a/src/query/ast/src/ast/statements/copy.rs b/src/query/ast/src/ast/statements/copy.rs index 241268dbd4dc..c31ed3096c0e 100644 --- a/src/query/ast/src/ast/statements/copy.rs +++ b/src/query/ast/src/ast/statements/copy.rs @@ -78,6 +78,9 @@ impl CopyIntoTableStmt { CopyIntoTableOption::DisableVariantCheck(v) => self.options.disable_variant_check = v, CopyIntoTableOption::ReturnFailedOnly(v) => self.options.return_failed_only = v, CopyIntoTableOption::OnError(v) => self.options.on_error = OnErrorMode::from_str(&v)?, + CopyIntoTableOption::ColumnMatchMode(v) => { + self.options.column_match_mode = Some(ColumnMatchMode::from_str(&v)?) + } } Ok(()) } @@ -111,37 +114,7 @@ impl Display for CopyIntoTableStmt { if !self.file_format.is_empty() { write!(f, " FILE_FORMAT = ({})", self.file_format)?; } - - if !self.options.validation_mode.is_empty() { - write!(f, "VALIDATION_MODE = {}", self.options.validation_mode)?; - } - - if self.options.size_limit != 0 { - write!(f, " SIZE_LIMIT = {}", self.options.size_limit)?; - } - - if self.options.max_files != 0 { - write!(f, " MAX_FILES = {}", self.options.max_files)?; - } - - if self.options.split_size != 0 { - write!(f, " SPLIT_SIZE = {}", self.options.split_size)?; - } - - write!(f, " PURGE = {}", self.options.purge)?; - write!(f, " FORCE = {}", self.options.force)?; - write!( - f, - " DISABLE_VARIANT_CHECK = {}", - self.options.disable_variant_check - )?; - write!(f, " ON_ERROR = {}", self.options.on_error)?; - write!( - f, - " RETURN_FAILED_ONLY = {}", - self.options.return_failed_only - )?; - + write!(f, " {}", self.options)?; Ok(()) } } @@ -159,6 +132,7 @@ pub struct CopyIntoTableOptions { pub disable_variant_check: bool, pub return_failed_only: bool, pub validation_mode: String, + pub column_match_mode: Option, } impl CopyIntoTableOptions { @@ -169,6 +143,10 @@ impl CopyIntoTableOptions { bool::from_str(v).map_err(|e| format!("can not parse {}={} as bool: {}", k, v, e)) } + pub fn set_column_match_mode(&mut self, mode: ColumnMatchMode) { + self.column_match_mode = Some(mode); + } + pub fn apply( &mut self, opts: &BTreeMap, @@ -183,6 +161,10 @@ impl CopyIntoTableOptions { let on_error = OnErrorMode::from_str(v)?; self.on_error = on_error; } + "column_match_mode" => { + let column_match_mode = ColumnMatchMode::from_str(v)?; + self.column_match_mode = Some(column_match_mode); + } "size_limit" => { self.size_limit = Self::parse_uint(k, v)?; } @@ -214,13 +196,30 @@ impl CopyIntoTableOptions { impl Display for CopyIntoTableOptions { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "OnErrorMode {}", self.on_error)?; - write!(f, "SizeLimit {}", self.size_limit)?; - write!(f, "MaxFiles {}", self.max_files)?; - write!(f, "SplitSize {}", self.split_size)?; - write!(f, "Purge {}", self.purge)?; - write!(f, "DisableVariantCheck {}", self.disable_variant_check)?; - write!(f, "ReturnFailedOnly {}", self.return_failed_only)?; + if !self.validation_mode.is_empty() { + write!(f, "VALIDATION_MODE = {}", self.validation_mode)?; + } + + if self.size_limit != 0 { + write!(f, " SIZE_LIMIT = {}", self.size_limit)?; + } + + if self.max_files != 0 { + write!(f, " MAX_FILES = {}", self.max_files)?; + } + + if self.split_size != 0 { + write!(f, " SPLIT_SIZE = {}", self.split_size)?; + } + + write!(f, " PURGE = {}", self.purge)?; + write!(f, " FORCE = {}", self.force)?; + write!(f, " DISABLE_VARIANT_CHECK = {}", self.disable_variant_check)?; + write!(f, " ON_ERROR = {}", self.on_error)?; + write!(f, " RETURN_FAILED_ONLY = {}", self.return_failed_only)?; + if let Some(mode) = &self.column_match_mode { + write!(f, " COLUMN_MATCH_MODE = {}", mode)?; + } Ok(()) } } @@ -572,6 +571,7 @@ pub enum CopyIntoTableOption { DisableVariantCheck(bool), ReturnFailedOnly(bool), OnError(String), + ColumnMatchMode(String), } pub enum CopyIntoLocationOption { @@ -714,3 +714,35 @@ impl FromStr for OnErrorMode { } } } + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)] +pub enum ColumnMatchMode { + CaseSensitive, + CaseInsensitive, + Position, +} + +impl Display for ColumnMatchMode { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + ColumnMatchMode::CaseSensitive => write!(f, "CASE_SENSITIVE"), + ColumnMatchMode::CaseInsensitive => write!(f, "CASE_INSENSITIVE"), + ColumnMatchMode::Position => write!(f, "POSITION"), + } + } +} + +const COLUMN_MATCH_MODE_MSG: &str = + "ColumnMatchMode must be one of {{ CASE_SENSITIVE | CASE_INSENSITIVE | POSITION }}"; +impl FromStr for ColumnMatchMode { + type Err = &'static str; + + fn from_str(s: &str) -> std::result::Result { + match s.to_uppercase().as_str() { + "CASE_SENSITIVE" => Ok(Self::CaseSensitive), + "CASE_INSENSITIVE" => Ok(Self::CaseInsensitive), + "POSITION" => Ok(Self::Position), + _ => Err(COLUMN_MATCH_MODE_MSG), + } + } +} diff --git a/src/query/ast/src/ast/statements/stage.rs b/src/query/ast/src/ast/statements/stage.rs index 428d40e4239c..196c757d9c43 100644 --- a/src/query/ast/src/ast/statements/stage.rs +++ b/src/query/ast/src/ast/statements/stage.rs @@ -72,6 +72,7 @@ pub enum SelectStageOption { Pattern(LiteralStringOrVariable), FileFormat(String), Connection(BTreeMap), + CaseSensitive(bool), } impl SelectStageOptions { @@ -83,6 +84,7 @@ impl SelectStageOptions { SelectStageOption::Pattern(v) => options.pattern = Some(v), SelectStageOption::FileFormat(v) => options.file_format = Some(v), SelectStageOption::Connection(v) => options.connection = v, + SelectStageOption::CaseSensitive(v) => options.case_sensitive = Some(v), } } options @@ -95,6 +97,7 @@ pub struct SelectStageOptions { pub pattern: Option, pub file_format: Option, pub connection: BTreeMap, + pub case_sensitive: Option, } impl SelectStageOptions { @@ -103,6 +106,7 @@ impl SelectStageOptions { && self.pattern.is_none() && self.file_format.is_none() && self.connection.is_empty() + && self.case_sensitive.is_none() } } @@ -139,6 +143,10 @@ impl Display for SelectStageOptions { write!(f, " PATTERN => {},", pattern)?; } + if let Some(case_sensitive) = self.case_sensitive { + write!(f, " CASE_SENSITIVE => {},", case_sensitive)?; + } + if !self.connection.is_empty() { write!(f, " CONNECTION => (")?; write_comma_separated_string_map(f, &self.connection)?; diff --git a/src/query/ast/src/parser/copy.rs b/src/query/ast/src/parser/copy.rs index 7692e3b92b09..faac48847a19 100644 --- a/src/query/ast/src/parser/copy.rs +++ b/src/query/ast/src/parser/copy.rs @@ -174,6 +174,10 @@ fn copy_into_table_option(i: Input) -> IResult { map(rule! { ON_ERROR ~ "=" ~ #ident }, |(_, _, on_error)| { CopyIntoTableOption::OnError(on_error.to_string()) }), + map( + rule! { COLUMN_MATCH_MODE ~ "=" ~ #ident }, + |(_, _, mode)| CopyIntoTableOption::ColumnMatchMode(mode.to_string()), + ), map( rule! { DISABLE_VARIANT_CHECK ~ "=" ~ #literal_bool }, |(_, _, disable_variant_check)| { diff --git a/src/query/ast/src/parser/stage.rs b/src/query/ast/src/parser/stage.rs index 8add065bd990..c545f38a3b06 100644 --- a/src/query/ast/src/parser/stage.rs +++ b/src/query/ast/src/parser/stage.rs @@ -262,5 +262,9 @@ pub fn select_stage_option(i: Input) -> IResult { rule! { CONNECTION ~ ^"=>" ~ ^#connection_options }, |(_, _, file_format)| SelectStageOption::Connection(file_format), ), + map( + rule! { CASE_SENSITIVE ~ ^"=>" ~ ^#literal_bool }, + |(_, _, case_sensitive)| SelectStageOption::CaseSensitive(case_sensitive), + ), ))(i) } diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 232cd7210a23..3fbbbf6f37fd 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -423,6 +423,8 @@ pub enum TokenKind { CALL, #[token("CASE", ignore(ascii_case))] CASE, + #[token("CASE_SENSITIVE", ignore(ascii_case))] + CASE_SENSITIVE, #[token("CAST", ignore(ascii_case))] CAST, #[token("CATALOG", ignore(ascii_case))] @@ -455,6 +457,8 @@ pub enum TokenKind { CHAR, #[token("COLUMN", ignore(ascii_case))] COLUMN, + #[token("COLUMN_MATCH_MODE", ignore(ascii_case))] + COLUMN_MATCH_MODE, #[token("COLUMNS", ignore(ascii_case))] COLUMNS, #[token("CHARACTER", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 7a5779795204..7d954117f30a 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -321,7 +321,7 @@ error: --> SQL:1:38 | 1 | COPY INTO mytable FROM 's3://bucket' CONECTION= (); - | ^^^^^^^^^ unexpected `CONECTION`, expecting `CONNECTION`, `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `PATTERN`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `MAX_FILES`, `DISABLE_VARIANT_CHECK`, `SPLIT_SIZE`, or `;` + | ^^^^^^^^^ unexpected `CONECTION`, expecting `CONNECTION`, `ON_ERROR`, `COLUMN_MATCH_MODE`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `PATTERN`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `MAX_FILES`, `DISABLE_VARIANT_CHECK`, `SPLIT_SIZE`, or `;` ---------- Input ---------- @@ -331,7 +331,7 @@ error: --> SQL:1:33 | 1 | COPY INTO mytable FROM @mystage CONNECTION = (); - | ^^^^^^^^^^ unexpected `CONNECTION`, expecting `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `DISABLE_VARIANT_CHECK`, `PATTERN`, `MAX_FILES`, `SPLIT_SIZE`, or `;` + | ^^^^^^^^^^ unexpected `CONNECTION`, expecting `ON_ERROR`, `COLUMN_MATCH_MODE`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `DISABLE_VARIANT_CHECK`, `PATTERN`, `MAX_FILES`, `SPLIT_SIZE`, or `;` ---------- Input ---------- @@ -594,7 +594,7 @@ error: --> SQL:1:57 | 1 | select $1 from @data/csv/books.csv (file_format => 'aa' bad_arg => 'x', pattern => 'bb') - | ------ ^^^^^^^ unexpected `bad_arg`, expecting `PATTERN`, `FILE_FORMAT`, `)`, `,`, `FILES`, or `CONNECTION` + | ------ ^^^^^^^ unexpected `bad_arg`, expecting `PATTERN`, `FILE_FORMAT`, `CASE_SENSITIVE`, `)`, `,`, `FILES`, or `CONNECTION` | | | while parsing `SELECT ...` diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 206022cca94e..397661bc18bf 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -9526,6 +9526,7 @@ Query( "tsv", ), connection: {}, + case_sensitive: None, }, alias: None, }, @@ -13853,7 +13854,7 @@ COPY INTO mytable FROM '@~/mybucket/my data.csv' size_limit=10; ---------- Output --------- -COPY INTO mytable FROM '@~/mybucket/my data.csv' SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@~/mybucket/my data.csv' SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -13895,6 +13896,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -13911,7 +13913,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM '@~/mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@~/mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -13966,6 +13968,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -13983,7 +13986,7 @@ COPY INTO mytable size_limit=10 max_files=10; ---------- Output --------- -COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 MAX_FILES = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 MAX_FILES = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14046,6 +14049,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14063,7 +14067,7 @@ COPY INTO mytable size_limit=10 max_files=3000; ---------- Output --------- -COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 MAX_FILES = 3000 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 MAX_FILES = 3000 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14126,6 +14130,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14145,7 +14150,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'http://127.0.0.1:9900' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'http://127.0.0.1:9900' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14210,6 +14215,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14229,7 +14235,7 @@ COPY INTO mytable skip_header = 1 ); ---------- Output --------- -COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'http://127.0.0.1:9900' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url = 'http://127.0.0.1:9900' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14294,6 +14300,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14303,7 +14310,7 @@ CopyIntoTable( COPY INTO mytable FROM 'https://127.0.0.1:9900'; ---------- Output --------- -COPY INTO mytable FROM 'https://127.0.0.1:9900/' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 'https://127.0.0.1:9900/' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14353,6 +14360,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14362,7 +14370,7 @@ CopyIntoTable( COPY INTO mytable FROM 'https://127.0.0.1:'; ---------- Output --------- -COPY INTO mytable FROM 'https://127.0.0.1/' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 'https://127.0.0.1/' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14412,6 +14420,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14429,7 +14438,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM '@my_stage' FILE_FORMAT = (error_on_column_count_mismatch = false, field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@my_stage' FILE_FORMAT = (error_on_column_count_mismatch = false, field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14487,6 +14496,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14682,7 +14692,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id = 'access_key', aws_secret_key = 'secret_key' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id = 'access_key', aws_secret_key = 'secret_key' ) FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14748,6 +14758,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14764,7 +14775,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14819,6 +14830,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14835,7 +14847,7 @@ COPY INTO mytable ) size_limit=10; ---------- Output --------- -COPY INTO mytable FROM '@external_stage/path/to/dir/' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@external_stage/path/to/dir/' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14890,6 +14902,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14906,7 +14919,7 @@ COPY INTO mytable ) force=true; ---------- Output --------- -COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) PURGE = false FORCE = true DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM '@external_stage/path/to/file.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) PURGE = false FORCE = true DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -14961,6 +14974,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -14978,7 +14992,7 @@ COPY INTO mytable size_limit=10 disable_variant_check=true; ---------- Output --------- -COPY INTO mytable FROM 'fs:///path/to/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = true ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO mytable FROM 'fs:///path/to/data.csv' FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SIZE_LIMIT = 10 PURGE = false FORCE = false DISABLE_VARIANT_CHECK = true ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -15041,6 +15055,7 @@ CopyIntoTable( disable_variant_check: true, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -15056,7 +15071,7 @@ COPY INTO books FROM 's3://databend/books.csv' ) FILE_FORMAT = (type = CSV); ---------- Output --------- -COPY INTO books FROM 's3://databend/books.csv' CONNECTION = ( access_key_id = 'ROOTUSER', endpoint_url = 'http://localhost:9000/', region = 'us-west-2', secret_access_key = 'CHANGEME123' ) FILE_FORMAT = (type = CSV) PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +COPY INTO books FROM 's3://databend/books.csv' CONNECTION = ( access_key_id = 'ROOTUSER', endpoint_url = 'http://localhost:9000/', region = 'us-west-2', secret_access_key = 'CHANGEME123' ) FILE_FORMAT = (type = CSV) PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CopyIntoTable( CopyIntoTableStmt { @@ -15115,6 +15130,7 @@ CopyIntoTable( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, ) @@ -16893,6 +16909,7 @@ Query( pattern: None, file_format: None, connection: {}, + case_sensitive: None, }, alias: None, }, @@ -16985,6 +17002,7 @@ Query( "PARQUET", ), connection: {}, + case_sensitive: None, }, alias: Some( TableAlias { @@ -17128,6 +17146,7 @@ Query( "parquet", ), connection: {}, + case_sensitive: None, }, alias: Some( TableAlias { @@ -17252,6 +17271,7 @@ Query( connection: { "endpoint_url": "xxx", }, + case_sensitive: None, }, alias: Some( TableAlias { @@ -20080,7 +20100,7 @@ CreateTask( ---------- Input ---------- CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 12 * * *' AS copy into streams_test.paper_table from @stream_stage FILE_FORMAT = (TYPE = PARQUET) PURGE=true ---------- Output --------- -CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 12 * * *' AS COPY INTO streams_test.paper_table FROM '@stream_stage' FILE_FORMAT = (type = PARQUET) PURGE = true FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 12 * * *' AS COPY INTO streams_test.paper_table FROM '@stream_stage' FILE_FORMAT = (type = PARQUET) PURGE = true FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CreateTask( CreateTaskStmt { @@ -20102,7 +20122,7 @@ CreateTask( after: [], when_condition: None, sql: SingleStatement( - "COPY INTO streams_test.paper_table FROM '@stream_stage' FILE_FORMAT = (type = PARQUET) PURGE = true FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false", + "COPY INTO streams_test.paper_table FROM '@stream_stage' FILE_FORMAT = (type = PARQUET) PURGE = true FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false", ), }, ) @@ -20893,7 +20913,7 @@ ShowLocks( ---------- Input ---------- CREATE PIPE IF NOT EXISTS MyPipe1 AUTO_INGEST = TRUE COMMENT = 'This is test pipe 1' AS COPY INTO MyTable1 FROM '@~/MyStage1' FILE_FORMAT = (TYPE = 'CSV') ---------- Output --------- -CREATE PIPE IF NOT EXISTS MyPipe1 AUTO_INGEST = TRUE COMMENTS = 'This is test pipe 1' AS COPY INTO MyTable1 FROM '@~/MyStage1' FILE_FORMAT = (type = 'CSV') PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +CREATE PIPE IF NOT EXISTS MyPipe1 AUTO_INGEST = TRUE COMMENTS = 'This is test pipe 1' AS COPY INTO MyTable1 FROM '@~/MyStage1' FILE_FORMAT = (type = 'CSV') PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CreatePipe( CreatePipeStmt { @@ -20944,6 +20964,7 @@ CreatePipe( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, }, @@ -20953,7 +20974,7 @@ CreatePipe( ---------- Input ---------- CREATE PIPE pipe1 AS COPY INTO db1.MyTable1 FROM @~/mybucket/data.csv ---------- Output --------- -CREATE PIPE pipe1 AS COPY INTO db1.MyTable1 FROM '@~/mybucket/data.csv' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false +CREATE PIPE pipe1 AS COPY INTO db1.MyTable1 FROM '@~/mybucket/data.csv' PURGE = false FORCE = false DISABLE_VARIANT_CHECK = false ON_ERROR = abort RETURN_FAILED_ONLY = false ---------- AST ------------ CreatePipe( CreatePipeStmt { @@ -21009,6 +21030,7 @@ CreatePipe( disable_variant_check: false, return_failed_only: false, validation_mode: "", + column_match_mode: None, }, }, }, diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 5b3fec728981..c52ea832a4e6 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -368,6 +368,7 @@ pub trait TableContext: Send + Sync { _files_info: StageFilesInfo, _files_to_copy: Option>, _max_column_position: usize, + _case_sensitive: bool, ) -> Result> { unimplemented!() } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 85df6db7987e..2b86c16706df 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1330,6 +1330,7 @@ impl TableContext for QueryContext { files_info: StageFilesInfo, files_to_copy: Option>, max_column_position: usize, + case_sensitive: bool, ) -> Result> { match stage_info.file_format_params { FileFormatParams::Parquet(..) => { @@ -1354,6 +1355,7 @@ impl TableContext for QueryContext { files_to_copy, self.get_settings(), self.get_query_kind(), + case_sensitive, ) .await } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 429a30cc4970..145057beb63e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -237,7 +237,7 @@ impl QueryContextShared { } pub fn get_on_error_mode(&self) -> Option { - *self.on_error_mode.read() + self.on_error_mode.read().clone() } pub fn set_on_error_mode(&self, mode: OnErrorMode) { diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_location.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_location.rs index 196dc6566dfa..72168a47f458 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_location.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_location.rs @@ -67,8 +67,16 @@ impl Binder { files: options.files.clone(), }; let table_ctx = self.ctx.clone(); - self.bind_stage_table(table_ctx, bind_context, stage_info, files_info, alias, None) - .await + self.bind_stage_table( + table_ctx, + bind_context, + stage_info, + files_info, + alias, + None, + options.case_sensitive.unwrap_or(false), + ) + .await }) } } diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index de9dd5c660e7..aa6582b35078 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -16,6 +16,7 @@ use std::str::FromStr; use std::sync::Arc; use databend_common_ast::ast::ColumnID as AstColumnID; +use databend_common_ast::ast::ColumnMatchMode; use databend_common_ast::ast::ColumnRef; use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_ast::ast::CopyIntoTableSource; @@ -35,6 +36,7 @@ use databend_common_ast::ast::TableReference; use databend_common_ast::ast::TypeName; use databend_common_ast::parser::parse_values_with_placeholder; use databend_common_ast::parser::tokenize_sql; +use databend_common_ast::Span; use databend_common_catalog::plan::list_stage_files; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::table_context::StageAttachment; @@ -157,6 +159,10 @@ impl<'a> Binder { if !stmt.file_format.is_empty() { stage_info.file_format_params = self.try_resolve_file_format(&stmt.file_format).await?; } + let mut options = stmt.options.clone(); + stage_info + .file_format_params + .check_copy_options(&mut options)?; if !(stmt.options.purge && stmt.options.force) && stmt.options.max_files > COPY_MAX_FILES_PER_COMMIT @@ -235,15 +241,25 @@ impl<'a> Binder { if use_query { let mut select_list = Vec::with_capacity(plan.required_source_schema.num_fields()); + let case_sensitive = plan + .stage_table_info + .copy_into_table_options + .column_match_mode + == Some(ColumnMatchMode::CaseSensitive); for dest_field in plan.required_source_schema.fields().iter() { let column = Expr::ColumnRef { span: None, column: ColumnRef { database: None, table: None, - column: AstColumnID::Name(Identifier::from_name( + column: AstColumnID::Name(Identifier::from_name_with_quoted( None, - dest_field.name().to_string(), + if case_sensitive { + dest_field.name().to_string() + } else { + dest_field.name().to_lowercase().to_string() + }, + Some('"'), )), }, }; @@ -260,7 +276,10 @@ impl<'a> Binder { }; select_list.push(SelectTarget::AliasedExpr { expr: Box::new(expr), - alias: None, + alias: Some(Identifier::from_name( + Span::None, + dest_field.name().to_string(), + )), }); } @@ -301,6 +320,9 @@ impl<'a> Binder { copy_options.apply(options, true)?; } copy_options.force = true; + stage_info + .file_format_params + .check_copy_options(&mut copy_options)?; let files_info = StageFilesInfo { path, @@ -400,6 +422,11 @@ impl<'a> Binder { if plan.no_file_to_copy { return Ok(Plan::CopyIntoTable(Box::new(plan))); } + let case_sensitive = plan + .stage_table_info + .copy_into_table_options + .column_match_mode + == Some(ColumnMatchMode::CaseSensitive); let table_ctx = self.ctx.clone(); let (s_expr, mut from_context) = self @@ -410,6 +437,7 @@ impl<'a> Binder { plan.stage_table_info.files_info.clone(), alias, plan.stage_table_info.files_to_copy.clone(), + case_sensitive, ) .await?; diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 5502dccb6acd..a7560c519ab5 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -115,11 +115,18 @@ impl Binder { files_info: StageFilesInfo, alias: &Option, files_to_copy: Option>, + case_sensitive: bool, ) -> Result<(SExpr, BindContext)> { let start = std::time::Instant::now(); let max_column_position = self.metadata.read().get_max_column_position(); let table = table_ctx - .create_stage_table(stage_info, files_info, files_to_copy, max_column_position) + .create_stage_table( + stage_info, + files_info, + files_to_copy, + max_column_position, + case_sensitive, + ) .await?; let table_alias_name = if let Some(table_alias) = alias { diff --git a/src/query/storages/common/stage/src/read/columnar/projection.rs b/src/query/storages/common/stage/src/read/columnar/projection.rs index 2211d6771e0b..eb67312c50bd 100644 --- a/src/query/storages/common/stage/src/read/columnar/projection.rs +++ b/src/query/storages/common/stage/src/read/columnar/projection.rs @@ -33,25 +33,37 @@ pub fn project_columnar( null_as: &NullAs, default_values: &Option>, location: &str, + case_sensitive: bool, ) -> databend_common_exception::Result<(Vec, Vec)> { let mut pushdown_columns = vec![]; let mut output_projection = vec![]; for (i, to_field) in output_schema.fields().iter().enumerate() { let field_name = to_field.name(); - let expr = match input_schema + let positions = input_schema .fields() .iter() - .position(|f| f.name() == field_name) - { - Some(pos) => { + .enumerate() + .filter(|(_, f)| { + if case_sensitive { + f.name() == field_name + } else { + f.name().to_lowercase() == field_name.to_lowercase() + } + }) + .map(|(pos, _)| pos) + .collect::>(); + + let expr = match positions.len() { + 1 => { + let pos = positions[0]; pushdown_columns.push(pos); let from_field = input_schema.field(pos); let expr = Expr::ColumnRef { span: None, id: pos, data_type: from_field.data_type().into(), - display_name: from_field.name().clone(), + display_name: to_field.name().clone(), }; if from_field.data_type == to_field.data_type { @@ -80,7 +92,7 @@ pub fn project_columnar( } } } - None => { + 0 => { match null_as { // default NullAs::Error => { @@ -111,6 +123,12 @@ pub fn project_columnar( } } } + _ => { + return Err(ErrorCode::BadArguments(format!( + "multi field named {} in file {}", + field_name, location + ))); + } }; output_projection.push(expr); } diff --git a/src/query/storages/orc/src/copy_into_table/projection.rs b/src/query/storages/orc/src/copy_into_table/projection.rs index 378bd4340d6d..0ac410e09dcf 100644 --- a/src/query/storages/orc/src/copy_into_table/projection.rs +++ b/src/query/storages/orc/src/copy_into_table/projection.rs @@ -55,6 +55,7 @@ impl ProjectionFactory { &self.null_as, &self.default_values, location, + false, )? .0; self.projections.insert(schema.clone(), v.clone()); diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index f93a0483dd6c..2528d4bc9617 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -19,6 +19,7 @@ async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +databend-common-ast = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } diff --git a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs index 348bd76660c9..7f297a8e7366 100644 --- a/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs +++ b/src/query/storages/parquet/src/parquet_rs/copy_into_table/reader.rs @@ -75,10 +75,11 @@ impl RowGroupReaderForCopy { output_schema: TableSchemaRef, default_values: Option>, missing_as: &NullAs, + case_sensitive: bool, ) -> Result { let arrow_schema = infer_schema_with_extension(file_metadata)?; let schema_descr = file_metadata.schema_descr_ptr(); - let parquet_table_schema = Arc::new(arrow_to_table_schema(&arrow_schema)?); + let parquet_table_schema = Arc::new(arrow_to_table_schema(&arrow_schema, case_sensitive)?); let (mut output_projection, mut pushdown_columns) = project_columnar( &parquet_table_schema, @@ -86,6 +87,7 @@ impl RowGroupReaderForCopy { missing_as, &default_values, location, + case_sensitive, )?; pushdown_columns.sort(); let mapping = pushdown_columns diff --git a/src/query/storages/parquet/src/parquet_rs/copy_into_table/table.rs b/src/query/storages/parquet/src/parquet_rs/copy_into_table/table.rs index 7f29460dd02c..fe2b36f26ca8 100644 --- a/src/query/storages/parquet/src/parquet_rs/copy_into_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/copy_into_table/table.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use databend_common_ast::ast::ColumnMatchMode; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfo; @@ -127,6 +128,8 @@ impl ParquetTableForCopy { "bug: ParquetTableForCopy::read_data must be called with StageSource", )); }; + let case_sensitive = stage_table_info.copy_into_table_options.column_match_mode + == Some(ColumnMatchMode::CaseSensitive); let fmt = match &stage_table_info.stage_info.file_format_params { FileFormatParams::Parquet(fmt) => fmt, @@ -155,6 +158,7 @@ impl ParquetTableForCopy { stage_table_info.schema.clone(), stage_table_info.default_values.clone(), &fmt.missing_field_as, + case_sensitive, )?); } } diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index b0d5b1b1ca61..286f39498ec1 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -35,6 +35,7 @@ use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::TableField; +use databend_common_expression::TableSchema; use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; @@ -121,6 +122,7 @@ impl ParquetRSTable { files_to_read: Option>, settings: Arc, query_kind: QueryKind, + case_sensitive: bool, ) -> Result> { let operator = init_stage_operator(&stage_info)?; let first_file = match &files_to_read { @@ -131,7 +133,8 @@ impl ParquetRSTable { let (arrow_schema, schema_descr, compression_ratio) = Self::prepare_metas(&first_file, operator.clone()).await?; - let table_info = create_parquet_table_info(&arrow_schema, &stage_info)?; + let schema = arrow_to_table_schema(&arrow_schema, case_sensitive)?.into(); + let table_info = create_parquet_table_info(schema, &stage_info)?; let leaf_fields = Arc::new(table_info.schema().leaf_fields()); // If the query is `COPY`, we don't need to collect column statistics. @@ -332,13 +335,16 @@ impl Table for ParquetRSTable { } } -fn create_parquet_table_info(schema: &ArrowSchema, stage_info: &StageInfo) -> Result { +fn create_parquet_table_info( + schema: Arc, + stage_info: &StageInfo, +) -> Result { Ok(TableInfo { ident: TableIdent::new(0, 0), desc: "''.'read_parquet'".to_string(), name: format!("read_parquet({})", stage_info.stage_name), meta: TableMeta { - schema: arrow_to_table_schema(schema)?.into(), + schema, engine: "SystemReadParquet".to_string(), created_on: DateTime::from_timestamp(0, 0).unwrap(), updated_on: DateTime::from_timestamp(0, 0).unwrap(), diff --git a/src/query/storages/parquet/src/parquet_rs/schema.rs b/src/query/storages/parquet/src/parquet_rs/schema.rs index 37fef98f2a01..062ad73037b5 100644 --- a/src/query/storages/parquet/src/parquet_rs/schema.rs +++ b/src/query/storages/parquet/src/parquet_rs/schema.rs @@ -43,11 +43,20 @@ pub(crate) fn lower_field_name(field: &ArrowField) -> ArrowField { } } -pub(crate) fn arrow_to_table_schema(schema: &ArrowSchema) -> Result { +pub(crate) fn arrow_to_table_schema( + schema: &ArrowSchema, + case_sensitive: bool, +) -> Result { let fields = schema .fields .iter() - .map(|f| Arc::new(lower_field_name(f))) + .map(|f| { + if case_sensitive { + f.clone() + } else { + Arc::new(lower_field_name(f)) + } + }) .collect::>(); let schema = ArrowSchema::new_with_metadata(fields, schema.metadata().clone()); TableSchema::try_from(&schema).map_err(ErrorCode::from_std_error) diff --git a/tests/sqllogictests/suites/stage/options/case_sensitive_parquet.test b/tests/sqllogictests/suites/stage/options/case_sensitive_parquet.test new file mode 100644 index 000000000000..8f1bc4777cc7 --- /dev/null +++ b/tests/sqllogictests/suites/stage/options/case_sensitive_parquet.test @@ -0,0 +1,95 @@ +statement ok +create or replace table t1 (a int, `B` int); + +statement ok +create or replace table t2 (a int, `A` int); + +statement ok +insert into t1 values (1, 2); + +statement ok +insert into t2 values (1, 2); + +statement ok +create or replace stage s1; + +statement ok +copy into @s1/t1 from t1; + +statement ok +copy into @s1/t2 from t2 + +query +select a, b from @s1/t1; +---- +1 2 + +query error column B doesn't exist +select a, `B` from @s1/t1; + +query error column b doesn't exist +select a, b from @s1/t1(case_sensitive=>true); + +query +select a, `B` from @s1/t1(case_sensitive=>true); +---- +1 2 + +query +copy into t1 from @s1/t1 return_failed_only=true force=true + +query +copy into t1 from @s1/t1 return_failed_only=true column_match_mode=case_sensitive force=true + +query error must be one of +copy into t1 from @s1/t1 return_failed_only=true column_match_mode=CaseSensitive force=true + +query error ambiguous +select a from @s1/t2; + +query +select a, `A` from @s1/t2(case_sensitive=>true); +---- +1 2 + +query error ambiguous +copy into t2 from @s1/t2 return_failed_only=true + +query +copy into t2 from @s1/t2 return_failed_only=true column_match_mode=case_sensitive force=true + +statement ok +truncate table t1 + +query +copy into t1 from @s1/t1 return_failed_only=true file_format=(type=Parquet, missing_field_as=null) force=true + +query +select * from t1 +---- +1 2 + +statement ok +truncate table t1 + +query +copy into t1 from @s1/t1 return_failed_only=true column_match_mode=case_sensitive file_format=(type=Parquet, missing_field_as=null) force=true + +query +select * from t1 +---- +1 2 + +statement ok +truncate table t2 + +query error multi field named +copy into t2 from @s1/t2 return_failed_only=true file_format=(type=Parquet, missing_field_as=null) force=true + +query +copy into t2 from @s1/t2 return_failed_only=true column_match_mode=case_sensitive file_format=(type=Parquet, missing_field_as=null) force=true + +query +select * from t2 +---- +1 2 \ No newline at end of file