Skip to content

Commit

Permalink
Fix values with different data types caused failure (#10445)
Browse files Browse the repository at this point in the history
* Fix values with different data types caused failure

* fix tests

* fix tests

* fix tests

* fix tests

* fix tests

* fix tests

* add `list_coercion`

* fix review suggestions
  • Loading branch information
b41sh authored May 14, 2024
1 parent cd36ee3 commit dbd0186
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 40 deletions.
67 changes: 37 additions & 30 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::logical_plan::{
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
Window,
};
use crate::type_coercion::binary::comparison_coercion;
use crate::type_coercion::binary::{comparison_coercion, values_coercion};
use crate::utils::{
can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
expand_wildcard, find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
Expand Down Expand Up @@ -173,13 +173,6 @@ impl LogicalPlanBuilder {
if n_cols == 0 {
return plan_err!("Values list cannot be zero length");
}
let empty_schema = DFSchema::empty();
let mut field_types: Vec<Option<DataType>> = Vec::with_capacity(n_cols);
for _ in 0..n_cols {
field_types.push(None);
}
// hold all the null holes so that we can correct their data types later
let mut nulls: Vec<(usize, usize)> = Vec::new();
for (i, row) in values.iter().enumerate() {
if row.len() != n_cols {
return plan_err!(
Expand All @@ -189,37 +182,50 @@ impl LogicalPlanBuilder {
n_cols
);
}
field_types = row
.iter()
.enumerate()
.map(|(j, expr)| {
if let Expr::Literal(ScalarValue::Null) = expr {
nulls.push((i, j));
Ok(field_types[j].clone())
} else {
let data_type = expr.get_type(&empty_schema)?;
if let Some(prev_data_type) = &field_types[j] {
if prev_data_type != &data_type {
return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_data_type} but found {data_type}")
}
}
Ok(Some(data_type))
}
})
.collect::<Result<Vec<Option<DataType>>>>()?;
}

let empty_schema = DFSchema::empty();
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
for j in 0..n_cols {
let mut common_type: Option<DataType> = None;
for (i, row) in values.iter().enumerate() {
let value = &row[j];
let data_type = value.get_type(&empty_schema)?;
if data_type == DataType::Null {
continue;
}
if let Some(prev_type) = common_type {
// get common type of each column values.
let Some(new_type) = values_coercion(&data_type, &prev_type) else {
return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
};
common_type = Some(new_type);
} else {
common_type = Some(data_type.clone());
}
}
field_types.push(common_type.unwrap_or(DataType::Utf8));
}
// wrap cast if data type is not same as common type.
for row in &mut values {
for (j, field_type) in field_types.iter().enumerate() {
if let Expr::Literal(ScalarValue::Null) = row[j] {
row[j] = Expr::Literal(ScalarValue::try_from(field_type.clone())?);
} else {
row[j] =
std::mem::take(&mut row[j]).cast_to(field_type, &empty_schema)?;
}
}
}
let fields = field_types
.iter()
.enumerate()
.map(|(j, data_type)| {
// naming is following convention https://www.postgresql.org/docs/current/queries-values.html
let name = &format!("column{}", j + 1);
Field::new(name, data_type.clone().unwrap_or(DataType::Utf8), true)
Field::new(name, data_type.clone(), true)
})
.collect::<Vec<_>>();
for (i, j) in nulls {
values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
}
let dfschema = DFSchema::from_unqualifed_fields(fields.into(), HashMap::new())?;
let schema = DFSchemaRef::new(dfschema);
Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
Expand Down Expand Up @@ -2146,6 +2152,7 @@ mod tests {

Ok(())
}

#[test]
fn test_change_redundant_column() -> Result<()> {
let t1_field_1 = Field::new("a", DataType::Int32, false);
Expand Down
43 changes: 34 additions & 9 deletions datafusion/expr/src/type_coercion/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,25 @@ pub fn comparison_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<D
.or_else(|| dictionary_coercion(lhs_type, rhs_type, true))
.or_else(|| temporal_coercion(lhs_type, rhs_type))
.or_else(|| string_coercion(lhs_type, rhs_type))
.or_else(|| list_coercion(lhs_type, rhs_type))
.or_else(|| null_coercion(lhs_type, rhs_type))
.or_else(|| string_numeric_coercion(lhs_type, rhs_type))
.or_else(|| string_temporal_coercion(lhs_type, rhs_type))
.or_else(|| binary_coercion(lhs_type, rhs_type))
}

/// Coerce `lhs_type` and `rhs_type` to a common type for value exprs
pub fn values_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
if lhs_type == rhs_type {
// same type => equality is possible
return Some(lhs_type.clone());
}
comparison_binary_numeric_coercion(lhs_type, rhs_type)
.or_else(|| temporal_coercion(lhs_type, rhs_type))
.or_else(|| string_coercion(lhs_type, rhs_type))
.or_else(|| binary_coercion(lhs_type, rhs_type))
}

/// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of a comparison operation
/// where one is numeric and one is `Utf8`/`LargeUtf8`.
fn string_numeric_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
Expand Down Expand Up @@ -665,15 +678,17 @@ fn dictionary_coercion(
/// 2. Data type of the other side should be able to cast to string type
fn string_concat_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) {
(Utf8, from_type) | (from_type, Utf8) => {
string_concat_internal_coercion(from_type, &Utf8)
}
(LargeUtf8, from_type) | (from_type, LargeUtf8) => {
string_concat_internal_coercion(from_type, &LargeUtf8)
}
_ => None,
})
string_coercion(lhs_type, rhs_type)
.or_else(|| list_coercion(lhs_type, rhs_type))
.or(match (lhs_type, rhs_type) {
(Utf8, from_type) | (from_type, Utf8) => {
string_concat_internal_coercion(from_type, &Utf8)
}
(LargeUtf8, from_type) | (from_type, LargeUtf8) => {
string_concat_internal_coercion(from_type, &LargeUtf8)
}
_ => None,
})
}

fn array_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
Expand Down Expand Up @@ -706,6 +721,14 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType>
(LargeUtf8, Utf8) => Some(LargeUtf8),
(Utf8, LargeUtf8) => Some(LargeUtf8),
(LargeUtf8, LargeUtf8) => Some(LargeUtf8),
_ => None,
}
}

/// Coercion rules for list types.
fn list_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
match (lhs_type, rhs_type) {
// TODO: cast between array elements (#6558)
(List(_), List(_)) => Some(lhs_type.clone()),
(List(_), _) => Some(lhs_type.clone()),
Expand Down Expand Up @@ -752,6 +775,7 @@ fn binary_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType>
/// This is a union of string coercion rules and dictionary coercion rules
pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
string_coercion(lhs_type, rhs_type)
.or_else(|| list_coercion(lhs_type, rhs_type))
.or_else(|| binary_to_string_coercion(lhs_type, rhs_type))
.or_else(|| dictionary_coercion(lhs_type, rhs_type, false))
.or_else(|| null_coercion(lhs_type, rhs_type))
Expand All @@ -761,6 +785,7 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataTyp
/// This is a union of string coercion rules and dictionary coercion rules
pub fn regex_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
string_coercion(lhs_type, rhs_type)
.or_else(|| list_coercion(lhs_type, rhs_type))
.or_else(|| dictionary_coercion(lhs_type, rhs_type, false))
}

Expand Down
12 changes: 11 additions & 1 deletion datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ VALUES (1),(1,2)
statement error DataFusion error: Error during planning: Inconsistent data type across values list at row 1 column 0
VALUES (1),('2')

statement error DataFusion error: Error during planning: Inconsistent data type across values list at row 1 column 0
query R
VALUES (1),(2.0)
----
1
2

statement error DataFusion error: Error during planning: Inconsistent data type across values list at row 1 column 1
VALUES (1,2), (1,'2')
Expand Down Expand Up @@ -473,6 +476,13 @@ CREATE TABLE foo AS VALUES
(3, 4),
(5, 6);

# multiple rows and columns need type coercion
statement ok
CREATE TABLE foo2(c1 double, c2 double) AS VALUES
(1.1, 4.1),
(2, 5),
(3, 6);

# foo distinct
query T
select distinct '1' from foo;
Expand Down

0 comments on commit dbd0186

Please sign in to comment.