From 18b2aaa04d956475ddc74fbbde3725370e2d7bde Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Thu, 24 Oct 2024 08:21:48 +0800 Subject: [PATCH] Infer data type from schema for `Values` and add struct coercion to `coalesce` (#12864) * first draft Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * add values table without schema Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * rm unused import Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * use option instead of vec Signed-off-by: jayzhan211 * Fix clippy * add values back and rename Signed-off-by: jayzhan211 * invalid query Signed-off-by: jayzhan211 * use values if no schema Signed-off-by: jayzhan211 * add doc Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 Co-authored-by: Andrew Lamb --- datafusion-cli/Cargo.lock | 1 + datafusion/common/src/dfschema.rs | 1 - datafusion/expr-common/Cargo.toml | 1 + .../expr-common/src/type_coercion/binary.rs | 90 ++++++++++++++++- datafusion/expr/src/logical_plan/builder.rs | 97 +++++++++++++++++-- datafusion/functions-nested/src/make_array.rs | 64 +++--------- datafusion/functions/src/core/coalesce.rs | 7 +- datafusion/proto/src/logical_plan/mod.rs | 1 + datafusion/sql/src/planner.rs | 15 +++ datafusion/sql/src/statement.rs | 11 ++- datafusion/sql/src/values.rs | 15 ++- datafusion/sqllogictest/test_files/array.slt | 1 - .../test_files/create_external_table.slt | 1 - datafusion/sqllogictest/test_files/ddl.slt | 6 ++ .../sqllogictest/test_files/group_by.slt | 12 +-- datafusion/sqllogictest/test_files/joins.slt | 33 ++++--- datafusion/sqllogictest/test_files/struct.slt | 94 +++++++++++++++--- .../sqllogictest/test_files/subquery.slt | 40 +++++--- datafusion/sqllogictest/test_files/unnest.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 2 +- 20 files changed, 368 insertions(+), 126 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 401f203dd931..24649832b27e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1359,6 +1359,7 @@ version = "42.1.0" dependencies = [ "arrow", "datafusion-common", + "itertools", "paste", ] diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 9a1fe9bba267..aa2d93989da1 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -315,7 +315,6 @@ impl DFSchema { None => self_unqualified_names.contains(field.name().as_str()), }; if !duplicated_field { - // self.inner.fields.push(field.clone()); schema_builder.push(Arc::clone(field)); qualifiers.push(qualifier.cloned()); } diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index 7e477efc4ebc..de11b19c3b06 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -40,4 +40,5 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } +itertools = { workspace = true } paste = "^1.0" diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 887586f4f783..2f806bf76d16 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -28,7 +28,10 @@ use arrow::datatypes::{ DataType, Field, FieldRef, Fields, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; -use datafusion_common::{exec_datafusion_err, plan_datafusion_err, plan_err, Result}; +use datafusion_common::{ + exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, +}; +use itertools::Itertools; /// The type signature of an instantiation of binary operator expression such as /// `lhs + rhs` @@ -372,6 +375,8 @@ impl From<&DataType> for TypeCategory { /// decimal precision and scale when coercing decimal types. /// /// This function doesn't preserve correct field name and nullability for the struct type, we only care about data type. +/// +/// Returns Option because we might want to continue on the code even if the data types are not coercible to the common type pub fn type_union_resolution(data_types: &[DataType]) -> Option { if data_types.is_empty() { return None; @@ -529,6 +534,89 @@ fn type_union_resolution_coercion( } } +/// Handle type union resolution including struct type and others. +pub fn try_type_union_resolution(data_types: &[DataType]) -> Result> { + let err = match try_type_union_resolution_with_struct(data_types) { + Ok(struct_types) => return Ok(struct_types), + Err(e) => Some(e), + }; + + if let Some(new_type) = type_union_resolution(data_types) { + Ok(vec![new_type; data_types.len()]) + } else { + exec_err!("Fail to find the coerced type, errors: {:?}", err) + } +} + +// Handle struct where we only change the data type but preserve the field name and nullability. +// Since field name is the key of the struct, so it shouldn't be updated to the common column name like "c0" or "c1" +pub fn try_type_union_resolution_with_struct( + data_types: &[DataType], +) -> Result> { + let mut keys_string: Option = None; + for data_type in data_types { + if let DataType::Struct(fields) = data_type { + let keys = fields.iter().map(|f| f.name().to_owned()).join(","); + if let Some(ref k) = keys_string { + if *k != keys { + return exec_err!("Expect same keys for struct type but got mismatched pair {} and {}", *k, keys); + } + } else { + keys_string = Some(keys); + } + } else { + return exec_err!("Expect to get struct but got {}", data_type); + } + } + + let mut struct_types: Vec = if let DataType::Struct(fields) = &data_types[0] + { + fields.iter().map(|f| f.data_type().to_owned()).collect() + } else { + return internal_err!("Struct type is checked is the previous function, so this should be unreachable"); + }; + + for data_type in data_types.iter().skip(1) { + if let DataType::Struct(fields) = data_type { + let incoming_struct_types: Vec = + fields.iter().map(|f| f.data_type().to_owned()).collect(); + // The order of field is verified above + for (lhs_type, rhs_type) in + struct_types.iter_mut().zip(incoming_struct_types.iter()) + { + if let Some(coerced_type) = + type_union_resolution_coercion(lhs_type, rhs_type) + { + *lhs_type = coerced_type; + } else { + return exec_err!( + "Fail to find the coerced type for {} and {}", + lhs_type, + rhs_type + ); + } + } + } else { + return exec_err!("Expect to get struct but got {}", data_type); + } + } + + let mut final_struct_types = vec![]; + for s in data_types { + let mut new_fields = vec![]; + if let DataType::Struct(fields) = s { + for (i, f) in fields.iter().enumerate() { + let field = Arc::unwrap_or_clone(Arc::clone(f)) + .with_data_type(struct_types[i].to_owned()); + new_fields.push(Arc::new(field)); + } + } + final_struct_types.push(DataType::Struct(new_fields.into())) + } + + Ok(final_struct_types) +} + /// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of a /// comparison operation /// diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 21304068a8ab..d2ecd56cdc23 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -46,13 +46,15 @@ use crate::{ use super::dml::InsertOp; use super::plan::ColumnUnnestList; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, - plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies, - Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, + exec_err, get_target_functional_dependencies, internal_err, not_impl_err, + plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, + FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, + UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -172,12 +174,45 @@ impl LogicalPlanBuilder { /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. /// + /// so it's usually better to override the default names with a table alias list. + /// + /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. + pub fn values(values: Vec>) -> Result { + if values.is_empty() { + return plan_err!("Values list cannot be empty"); + } + let n_cols = values[0].len(); + if n_cols == 0 { + return plan_err!("Values list cannot be zero length"); + } + for (i, row) in values.iter().enumerate() { + if row.len() != n_cols { + return plan_err!( + "Inconsistent data length across values list: got {} values in row {} but expected {}", + row.len(), + i, + n_cols + ); + } + } + + // Infer from data itself + Self::infer_data(values) + } + + /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming + /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) + /// documentation for more details. + /// /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table. /// The column names are not specified by the SQL standard and different database systems do it differently, /// so it's usually better to override the default names with a table alias list. /// /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. - pub fn values(mut values: Vec>) -> Result { + pub fn values_with_schema( + values: Vec>, + schema: &DFSchemaRef, + ) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty"); } @@ -196,16 +231,53 @@ impl LogicalPlanBuilder { } } - let empty_schema = DFSchema::empty(); + // Check the type of value against the schema + Self::infer_values_from_schema(values, schema) + } + + fn infer_values_from_schema( + values: Vec>, + schema: &DFSchema, + ) -> Result { + let n_cols = values[0].len(); + let mut field_types: Vec = Vec::with_capacity(n_cols); + for j in 0..n_cols { + let field_type = schema.field(j).data_type(); + for row in values.iter() { + let value = &row[j]; + let data_type = value.get_type(schema)?; + + if !data_type.equals_datatype(field_type) { + if can_cast_types(&data_type, field_type) { + } else { + return exec_err!( + "type mistmatch and can't cast to got {} and {}", + data_type, + field_type + ); + } + } + } + field_types.push(field_type.to_owned()); + } + + Self::infer_inner(values, &field_types, schema) + } + + fn infer_data(values: Vec>) -> Result { + let n_cols = values[0].len(); + let schema = DFSchema::empty(); + let mut field_types: Vec = Vec::with_capacity(n_cols); for j in 0..n_cols { let mut common_type: Option = None; for (i, row) in values.iter().enumerate() { let value = &row[j]; - let data_type = value.get_type(&empty_schema)?; + let data_type = value.get_type(&schema)?; if data_type == DataType::Null { continue; } + if let Some(prev_type) = common_type { // get common type of each column values. let data_types = vec![prev_type.clone(), data_type.clone()]; @@ -221,14 +293,22 @@ impl LogicalPlanBuilder { // since the code loop skips NULL field_types.push(common_type.unwrap_or(DataType::Null)); } + + Self::infer_inner(values, &field_types, &schema) + } + + fn infer_inner( + mut values: Vec>, + field_types: &[DataType], + schema: &DFSchema, + ) -> Result { // wrap cast if data type is not same as common type. for row in &mut values { for (j, field_type) in field_types.iter().enumerate() { if let Expr::Literal(ScalarValue::Null) = row[j] { row[j] = Expr::Literal(ScalarValue::try_from(field_type)?); } else { - row[j] = - std::mem::take(&mut row[j]).cast_to(field_type, &empty_schema)?; + row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?; } } } @@ -243,6 +323,7 @@ impl LogicalPlanBuilder { .collect::>(); let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?; let schema = DFSchemaRef::new(dfschema); + Ok(Self::new(LogicalPlan::Values(Values { schema, values }))) } diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index efc14cbbe519..abd7649e9ec7 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -28,15 +28,15 @@ use arrow_array::{ use arrow_buffer::OffsetBuffer; use arrow_schema::DataType::{LargeList, List, Null}; use arrow_schema::{DataType, Field}; -use datafusion_common::{exec_err, internal_err}; use datafusion_common::{plan_err, utils::array_into_list_array_nullable, Result}; -use datafusion_expr::binary::type_union_resolution; +use datafusion_expr::binary::{ + try_type_union_resolution_with_struct, type_union_resolution, +}; use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY; use datafusion_expr::TypeSignature; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; -use itertools::Itertools; use crate::utils::make_scalar_function; @@ -111,33 +111,16 @@ impl ScalarUDFImpl for MakeArray { } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - if let Some(new_type) = type_union_resolution(arg_types) { - // TODO: Move the logic to type_union_resolution if this applies to other functions as well - // Handle struct where we only change the data type but preserve the field name and nullability. - // Since field name is the key of the struct, so it shouldn't be updated to the common column name like "c0" or "c1" - let is_struct_and_has_same_key = are_all_struct_and_have_same_key(arg_types)?; - if is_struct_and_has_same_key { - let data_types: Vec<_> = if let DataType::Struct(fields) = &arg_types[0] { - fields.iter().map(|f| f.data_type().to_owned()).collect() - } else { - return internal_err!("Struct type is checked is the previous function, so this should be unreachable"); - }; - - let mut final_struct_types = vec![]; - for s in arg_types { - let mut new_fields = vec![]; - if let DataType::Struct(fields) = s { - for (i, f) in fields.iter().enumerate() { - let field = Arc::unwrap_or_clone(Arc::clone(f)) - .with_data_type(data_types[i].to_owned()); - new_fields.push(Arc::new(field)); - } - } - final_struct_types.push(DataType::Struct(new_fields.into())) - } - return Ok(final_struct_types); + let mut errors = vec![]; + match try_type_union_resolution_with_struct(arg_types) { + Ok(r) => return Ok(r), + Err(e) => { + errors.push(e); } + } + if let Some(new_type) = type_union_resolution(arg_types) { + // TODO: Move FixedSizeList to List in type_union_resolution if let DataType::FixedSizeList(field, _) = new_type { Ok(vec![DataType::List(field); arg_types.len()]) } else if new_type.is_null() { @@ -147,9 +130,10 @@ impl ScalarUDFImpl for MakeArray { } } else { plan_err!( - "Fail to find the valid type between {:?} for {}", + "Fail to find the valid type between {:?} for {}, errors are {:?}", arg_types, - self.name() + self.name(), + errors ) } } @@ -188,26 +172,6 @@ fn get_make_array_doc() -> &'static Documentation { }) } -fn are_all_struct_and_have_same_key(data_types: &[DataType]) -> Result { - let mut keys_string: Option = None; - for data_type in data_types { - if let DataType::Struct(fields) = data_type { - let keys = fields.iter().map(|f| f.name().to_owned()).join(","); - if let Some(ref k) = keys_string { - if *k != keys { - return exec_err!("Expect same keys for struct type but got mismatched pair {} and {}", *k, keys); - } - } else { - keys_string = Some(keys); - } - } else { - return Ok(false); - } - } - - Ok(true) -} - // Empty array is a special case that is useful for many other array functions pub(super) fn empty_array_type() -> DataType { DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) diff --git a/datafusion/functions/src/core/coalesce.rs b/datafusion/functions/src/core/coalesce.rs index 15cd733a8cd6..a05f3f08232c 100644 --- a/datafusion/functions/src/core/coalesce.rs +++ b/datafusion/functions/src/core/coalesce.rs @@ -20,8 +20,8 @@ use arrow::compute::kernels::zip::zip; use arrow::compute::{and, is_not_null, is_null}; use arrow::datatypes::DataType; use datafusion_common::{exec_err, ExprSchema, Result}; +use datafusion_expr::binary::try_type_union_resolution; use datafusion_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; -use datafusion_expr::type_coercion::binary::type_union_resolution; use datafusion_expr::{ColumnarValue, Documentation, Expr, ExprSchemable}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use itertools::Itertools; @@ -137,9 +137,8 @@ impl ScalarUDFImpl for CoalesceFunc { if arg_types.is_empty() { return exec_err!("coalesce must have at least one argument"); } - let new_type = type_union_resolution(arg_types) - .unwrap_or(arg_types.first().unwrap().clone()); - Ok(vec![new_type; arg_types.len()]) + + try_type_union_resolution(arg_types) } fn documentation(&self) -> Option<&Documentation> { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index f57910b09ade..4adbb9318d51 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -281,6 +281,7 @@ impl AsLogicalPlan for LogicalPlanNode { .collect::, _>>() .map_err(|e| e.into()) }?; + LogicalPlanBuilder::values(values)?.build() } LogicalPlanType::Projection(projection) => { diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 66e360a9ade9..072d2320fccf 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -138,6 +138,8 @@ pub struct PlannerContext { /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, + /// The query schema defined by the table + create_table_schema: Option, } impl Default for PlannerContext { @@ -154,6 +156,7 @@ impl PlannerContext { ctes: HashMap::new(), outer_query_schema: None, outer_from_schema: None, + create_table_schema: None, } } @@ -181,6 +184,18 @@ impl PlannerContext { schema } + pub fn set_table_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.create_table_schema, &mut schema); + schema + } + + pub fn table_schema(&self) -> Option { + self.create_table_schema.clone() + } + // Return a clone of the outer FROM schema pub fn outer_from_schema(&self) -> Option> { self.outer_from_schema.clone() diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 60e3413b836f..29852be3bf77 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -394,13 +394,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Build column default values let column_defaults = self.build_column_defaults(&columns, planner_context)?; + + let has_columns = !columns.is_empty(); + let schema = self.build_schema(columns)?.to_dfschema_ref()?; + if has_columns { + planner_context.set_table_schema(Some(Arc::clone(&schema))); + } + match query { Some(query) => { let plan = self.query_to_plan(*query, planner_context)?; let input_schema = plan.schema(); - let plan = if !columns.is_empty() { - let schema = self.build_schema(columns)?.to_dfschema_ref()?; + let plan = if has_columns { if schema.fields().len() != input_schema.fields().len() { return plan_err!( "Mismatch: {} columns specified, but result has {} columns", @@ -447,7 +453,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } None => { - let schema = self.build_schema(columns)?.to_dfschema_ref()?; let plan = EmptyRelation { produce_one_row: false, schema, diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index cd33ddb3cfe7..a4001bea7dea 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; @@ -31,16 +33,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { rows, } = values; - // Values should not be based on any other schema - let schema = DFSchema::empty(); + let empty_schema = Arc::new(DFSchema::empty()); let values = rows .into_iter() .map(|row| { row.into_iter() - .map(|v| self.sql_to_expr(v, &schema, planner_context)) + .map(|v| self.sql_to_expr(v, &empty_schema, planner_context)) .collect::>>() }) .collect::>>()?; - LogicalPlanBuilder::values(values)?.build() + + let schema = planner_context.table_schema().unwrap_or(empty_schema); + if schema.fields().is_empty() { + LogicalPlanBuilder::values(values)?.build() + } else { + LogicalPlanBuilder::values_with_schema(values, &schema)?.build() + } } } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 69f62057c761..bfdbfb1bcc5e 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -7288,4 +7288,3 @@ drop table values_all_empty; statement ok drop table fixed_size_col_table; - diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 7dba4d01d63b..ed001cf9f84c 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -283,4 +283,3 @@ CREATE EXTERNAL TABLE staging.foo STORED AS parquet LOCATION '../../parquet-test # Create external table with qualified name, but no schema should error statement error DataFusion error: Error during planning: failed to resolve schema: release CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; - diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 813f7e95adf0..3205920d7110 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -799,3 +799,9 @@ CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../core/tests/data statement error DataFusion error: This feature is not implemented: Temporary views not supported CREATE TEMPORARY VIEW y AS VALUES (1,2,3); + +query error DataFusion error: Schema error: No field named a\. +EXPLAIN CREATE TABLE t(a int) AS VALUES (a + a); + +statement error DataFusion error: Schema error: No field named a\. +CREATE TABLE t(a int) AS SELECT x FROM (VALUES (a)) t(x) WHERE false; \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 4f2778b5c0d1..61b3ad73cd0a 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3360,7 +3360,8 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=4 06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] -08)--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +09)----------------MemoryExec: partitions=1, partition_sizes=[1] query IRI SELECT s.sn, s.amount, 2*s.sn @@ -3430,9 +3431,9 @@ physical_plan 07)------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[sum(l.amount)] 08)--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount] 09)----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1 -10)------------------CoalescePartitionsExec -11)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] -12)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +12)--------------------MemoryExec: partitions=1, partition_sizes=[1] query IRR SELECT r.sn, SUM(l.amount), r.amount @@ -3579,8 +3580,7 @@ physical_plan 08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 09)----------------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@6 as sum_amount] 10)------------------BoundedWindowAggExec: wdw=[sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] -11)--------------------CoalescePartitionsExec -12)----------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +11)--------------------MemoryExec: partitions=1, partition_sizes=[1] query ITIPTRR diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 558a9170c7d3..af272e8f5022 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3901,8 +3901,8 @@ SELECT * FROM ( ) AS rhs ON lhs.b=rhs.b ---- 11 1 21 1 -14 2 22 2 12 3 23 3 +14 2 22 2 15 4 24 4 query TT @@ -3922,11 +3922,12 @@ logical_plan 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 06)------TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------MemoryExec: partitions=1, partition_sizes=[1] @@ -3979,10 +3980,11 @@ logical_plan 04)--SubqueryAlias: rhs 05)----TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------MemoryExec: partitions=1, partition_sizes=[1] # Null build indices: @@ -4038,11 +4040,12 @@ logical_plan 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 06)------TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------MemoryExec: partitions=1, partition_sizes=[1] # Test CROSS JOIN LATERAL syntax (planning) diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index b76c78396aed..7596b820c688 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -392,12 +392,12 @@ create table t(a struct, b struct) as valu query T select arrow_typeof([a, b]) from t; ---- -List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) +List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) query ? select [a, b] from t; ---- -[{r: red, c: 1}, {r: blue, c: 2}] +[{r: red, c: 1.0}, {r: blue, c: 2.3}] statement ok drop table t; @@ -453,6 +453,27 @@ Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ statement ok drop table t; +statement ok +create table t as values({r: 'a', c: 1}), ({r: 'b', c: 2.3}); + +query ? +select * from t; +---- +{c0: a, c1: 1.0} +{c0: b, c1: 2.3} + +query T +select arrow_typeof(column1) from t; +---- +Struct([Field { name: "c0", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "c0", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +statement ok +drop table t; + +query error DataFusion error: Arrow error: Cast error: Cannot cast string 'a' to value of Float64 type +create table t as values({r: 'a', c: 1}), ({c: 2.3, r: 'b'}); + ################################## ## Test Coalesce with Struct ################################## @@ -474,13 +495,12 @@ select coalesce(s1) from t; {a: 2, b: blue} {a: 3, b: green} -# TODO: a's type should be float query T -select arrow_typeof(coalesce(s1)) from t; +select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) statement ok drop table t; @@ -495,26 +515,32 @@ CREATE TABLE t ( (row(3, 'green'), row(33.2, 'string3')) ; -# TODO: second column should not be null query ? -select coalesce(s1) from t; +select coalesce(s1, s2) from t; ---- -{a: 1, b: red} -NULL -{a: 3, b: green} +{a: 1.0, b: red} +{a: 2.2, b: string2} +{a: 3.0, b: green} + +query T +select arrow_typeof(coalesce(s1, s2)) from t; +---- +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) statement ok drop table t; # row() with incorrect order -statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'blue' to value of Float64 type +statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'blue' to value of Float32 type create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as values (row('red', 1), row(2.3, 'blue')), (row('purple', 1), row('green', 2.3)); # out of order struct literal # TODO: This query should not fail -statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'a' to value of Int64 type +statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'b' to value of Int32 type create table t(a struct(r varchar, c int)) as values ({r: 'a', c: 1}), ({c: 2, r: 'b'}); ################################## @@ -529,3 +555,43 @@ select [{r: 'a', c: 1}, {r: 'b', c: 2}]; # Can't create a list of struct with different field types query error select [{r: 'a', c: 1}, {c: 2, r: 'b'}]; + +statement ok +create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as values (row('a', 1), row('b', 2.3)); + +query T +select arrow_typeof([a, b]) from t; +---- +List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) + +statement ok +drop table t; + +# create table with different struct type is fine +statement ok +create table t(a struct(r varchar, c int), b struct(c float, r varchar)) as values (row('a', 1), row(2.3, 'b')); + +# create array with different struct type is not valid +query error +select arrow_typeof([a, b]) from t; + +statement ok +drop table t; + +statement ok +create table t(a struct(r varchar, c int, g float), b struct(r varchar, c float, g int)) as values (row('a', 1, 2.3), row('b', 2.3, 2)); + +# type of each column should not coerced but perserve as it is +query T +select arrow_typeof(a) from t; +---- +Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "g", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +# type of each column should not coerced but perserve as it is +query T +select arrow_typeof(b) from t; +---- +Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "g", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +statement ok +drop table t; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index ab6dc3a9e588..6b142302a543 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -208,10 +208,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1 @@ -242,10 +244,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int * Float64(1))] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query IR rowsort SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1 @@ -276,10 +280,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1 @@ -313,10 +319,12 @@ physical_plan 08)--------------CoalesceBatchesExec: target_batch_size=2 09)----------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 10)------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -11)--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -12)------CoalesceBatchesExec: target_batch_size=2 -13)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -14)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +12)----------------------MemoryExec: partitions=1, partition_sizes=[1] +13)------CoalesceBatchesExec: target_batch_size=2 +14)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +15)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +16)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1 diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index b923e94fc819..947eb8630b52 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -643,7 +643,7 @@ NULL [4] [{c0: [2], c1: [[3], [4]]}] 4 [3] [{c0: [2], c1: [[3], [4]]}] NULL [4] [{c0: [2], c1: [[3], [4]]}] -## demonstrate where recursive unnest is impossible +## demonstrate where recursive unnest is impossible ## and need multiple unnesting logical plans ## e.g unnest -> field_access -> unnest query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 51e859275512..95d850795772 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5021,4 +5021,4 @@ NULL statement ok DROP TABLE t1; -## end test handle NULL of lead \ No newline at end of file +## end test handle NULL of lead