diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index c032a321e1..70087fb3b3 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -33,6 +33,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; +use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -56,7 +57,6 @@ use datafusion_expr::{ use filter::try_construct_early_filter; use futures::future::BoxFuture; -use itertools::Itertools; use parquet::file::properties::WriterProperties; use serde::Serialize; use tracing::log::*; @@ -719,7 +719,6 @@ async fn execute( // be disabled in the common case(s) let should_cdc = should_write_cdc(&snapshot)?; // Change data may be collected and then written out at the completion of the merge - let mut change_data = vec![]; if should_cdc { debug!("Executing a merge and I should write CDC!"); @@ -790,7 +789,7 @@ async fn execute( fn add_generated_columns( mut df: DataFrame, generated_cols: &Vec, - generated_cols_missing_in_source: &Vec, + generated_cols_missing_in_source: &[String], state: &SessionState, ) -> DeltaResult { debug!("Generating columns in dataframe"); @@ -1049,6 +1048,7 @@ async fn execute( let mut new_columns = vec![]; let mut write_projection = Vec::new(); + let mut write_projection_with_cdf = Vec::new(); for delta_field in snapshot.schema().fields() { let mut when_expr = Vec::with_capacity(operations_size); @@ -1086,9 +1086,26 @@ async fn execute( Expr::Column(Column::from_name(name.clone())).alias(delta_field.name()), delta_field.data_type().try_into()?, )); + + write_projection_with_cdf.push( + when( + col(CDC_COLUMN_NAME).not_eq(lit("update_preimage")), + cast( + Expr::Column(Column::from_name(name.clone())), + delta_field.data_type().try_into()?, + ), + ) + .otherwise(cast( + Expr::Column(Column::new(Some(target_name.clone()), delta_field.name())), // We take the column from target table + delta_field.data_type().try_into()?, + ))? + .alias(delta_field.name()), + ); new_columns.push((name, case)); } + write_projection_with_cdf.push(col("_change_type")); + let mut insert_when = Vec::with_capacity(ops.len()); let mut insert_then = Vec::with_capacity(ops.len()); @@ -1219,93 +1236,59 @@ async fn execute( let operation_count = DataFrame::new(state.clone(), operation_count); - if should_cdc { - // Create a dataframe containing the CDC deletes which are present at this point - change_data.push( - operation_count - .clone() - .filter(col(DELETE_COLUMN))? - .select(write_projection.clone())? - .with_column(CDC_COLUMN_NAME, lit("delete"))?, - ); - } - - let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?; - - if should_cdc { - debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets"); - let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq( - // This is a copy operation, but I'm not sure how to turn that enum into an int - lit(5), - ))?; - - let cdc_insert_df = cdc_projection + let mut projected = if should_cdc { + operation_count .clone() - .filter( - col(SOURCE_COLUMN) - .is_true() - .and(col(TARGET_COLUMN).is_null()), + .with_column( + CDC_COLUMN_NAME, + when(col(TARGET_DELETE_COLUMN).is_null(), lit("delete")) // nulls are equal to True + .when(col(DELETE_COLUMN).is_null(), lit("source_delete")) + .when(col(TARGET_COPY_COLUMN).is_null(), lit("copy")) + .when(col(TARGET_INSERT_COLUMN).is_null(), lit("insert")) + .when(col(TARGET_UPDATE_COLUMN).is_null(), lit("update")) + .end()?, )? - .select(write_projection.clone())? - .with_column(CDC_COLUMN_NAME, lit("insert"))?; - - change_data.push(cdc_insert_df); - - let after = cdc_projection - .clone() - .filter(col(TARGET_COLUMN).is_true())? - .select(write_projection.clone())?; - - // Extra select_columns is required so that before and after have same schema order - // DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650 - let before = cdc_projection - .clone() - .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? - .select( - target_schema - .columns() - .iter() - .filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN) - .map(|c| Expr::Column(c.clone())) - .collect_vec(), + .drop_columns(&["__delta_rs_path"])? // WEIRD bug caused by interaction with unnest_columns, has to be dropped otherwise throws schema error + .with_column( + "__delta_rs_update_expanded", + when( + col(CDC_COLUMN_NAME).eq(lit("update")), + lit(ScalarValue::List(ScalarValue::new_list( + &[ + ScalarValue::Utf8(Some("update_preimage".into())), + ScalarValue::Utf8(Some("update_postimage".into())), + ], + &DataType::List(Field::new("element", DataType::Utf8, false).into()), + true, + ))), + ) + .end()?, )? - .select_columns( - &after - .schema() - .columns() - .iter() - .map(|v| v.name()) - .collect::>(), - )?; - - let tracker = CDCTracker::new(before, after); - change_data.push(tracker.collect()?); - } - - let mut project = filtered.clone().select(write_projection)?; - - if should_cdc && !change_data.is_empty() { - let mut df = change_data - .pop() - .expect("change_data should never be empty"); - // Accumulate all the changes together into a single data frame to produce the necessary - // change data files - for change in change_data { - df = df.union(change)?; - } - project = project - .with_column(CDC_COLUMN_NAME, lit(ScalarValue::Null))? - .union(df)?; - } + .unnest_columns(&["__delta_rs_update_expanded"])? + .with_column( + CDC_COLUMN_NAME, + when( + col(CDC_COLUMN_NAME).eq(lit("update")), + col("__delta_rs_update_expanded"), + ) + .otherwise(col(CDC_COLUMN_NAME))?, + )? + .drop_columns(&["__delta_rs_update_expanded"])? + .select(write_projection_with_cdf)? + } else { + operation_count + .filter(col(DELETE_COLUMN).is_false())? + .select(write_projection)? + }; - project = add_generated_columns( - project, + projected = add_generated_columns( + projected, &generated_col_expressions, &missing_generated_columns, &state, )?; - let merge_final = &project.into_unoptimized_plan(); + let merge_final = &projected.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?; let err = || DeltaTableError::Generic("Unable to locate expected metric node".into()); diff --git a/crates/core/src/operations/merge/writer.rs b/crates/core/src/operations/merge/writer.rs index a477286241..2c563abd10 100644 --- a/crates/core/src/operations/merge/writer.rs +++ b/crates/core/src/operations/merge/writer.rs @@ -9,7 +9,7 @@ use arrow_schema::{Schema, SchemaRef as ArrowSchemaRef}; use datafusion::catalog::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion_expr::col; +use datafusion_expr::{col, lit}; use datafusion_physical_plan::ExecutionPlan; use futures::StreamExt; use object_store::prefix::PrefixStore; @@ -170,9 +170,21 @@ pub(crate) async fn write_execution_plan_v2( let normal_df = batch_df .clone() - .filter(col(CDC_COLUMN_NAME).is_null())? + .filter(col(CDC_COLUMN_NAME).in_list( + vec![lit("delete"), lit("source_delete"), lit("update_preimage")], + true, + ))? .drop_columns(&[CDC_COLUMN_NAME])?; - let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).is_not_null())?; + + let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list( + vec![ + lit("delete"), + lit("insert"), + lit("update_preimage"), + lit("update_postimage"), + ], + false, + ))?; let normal_batch = concat_batches(&write_schema, &normal_df.collect().await?)?;