diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 7f87d30d35..fe8d030464 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1154,6 +1154,14 @@ async fn execute( .select(write_projection.clone())? .with_column(CDC_COLUMN_NAME, lit("insert"))?, ); + + let after = cdc_projection + .clone() + .filter(col(TARGET_COLUMN).is_true())? + .select(write_projection.clone())?; + + // Extra select_columns is required so that before and after have same schema order + // DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650 let before = cdc_projection .clone() .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? @@ -1164,13 +1172,16 @@ async fn execute( .filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN) .map(|c| Expr::Column(c.clone())) .collect_vec(), + )? + .select_columns( + &after + .schema() + .columns() + .iter() + .map(|v| v.name()) + .collect::>(), )?; - let after = cdc_projection - .clone() - .filter(col(TARGET_COLUMN).is_true())? - .select(write_projection.clone())?; - let tracker = CDCTracker::new(before, after); change_data.push(tracker.collect()?); } diff --git a/python/Cargo.toml b/python/Cargo.toml index 761ca51067..fadc260d0d 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.20.1" +version = "0.20.2" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 54c2726fd3..2306e1668a 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -1,3 +1,5 @@ +import datetime +import os import pathlib import pyarrow as pa @@ -1038,3 +1040,40 @@ def test_merge_isin_partition_pruning( assert result == expected assert metrics["num_target_files_scanned"] == 2 assert metrics["num_target_files_skipped_during_scan"] == 3 + + +def test_cdc_merge_planning_union_2908(tmp_path): + """https://github.com/delta-io/delta-rs/issues/2908""" + cdc_path = f"{tmp_path}/_change_data" + + data = { + "id": pa.array([1, 2], pa.int64()), + "date": pa.array( + [datetime.date(1970, 1, 1), datetime.date(1970, 1, 2)], pa.date32() + ), + } + + table = pa.Table.from_pydict(data) + + dt = DeltaTable.create( + table_uri=tmp_path, + schema=table.schema, + mode="overwrite", + partition_by=["id"], + configuration={ + "delta.enableChangeDataFeed": "true", + }, + ) + + dt.merge( + source=table, + predicate="s.id = t.id", + source_alias="s", + target_alias="t", + ).when_not_matched_insert_all().execute() + + last_action = dt.history(1)[0] + + assert last_action["operation"] == "MERGE" + assert dt.version() == 1 + assert os.path.exists(cdc_path), "_change_data doesn't exist"