From 747603fe623e5ec91468ec5c85fda0399b6016f4 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 10 Aug 2024 21:34:39 +0000 Subject: [PATCH] feat: introduce change data capture support for merge operations This does not have as much testing as it probably should, but preliminary results are looking good! --- crates/core/src/delta_datafusion/mod.rs | 2 +- crates/core/src/operations/cdc.rs | 9 +- crates/core/src/operations/delete.rs | 3 +- crates/core/src/operations/merge/mod.rs | 118 ++++++++++++++++++------ 4 files changed, 94 insertions(+), 38 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 68ae2c164c..b08273164a 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -86,7 +86,7 @@ use crate::table::state::DeltaTableState; use crate::table::Constraint; use crate::{open_table, open_table_with_storage_options, DeltaTable}; -const PATH_COLUMN: &str = "__delta_rs_path"; +pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; pub mod cdf; pub mod expr; diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index 6120d3c1d2..6baf517d15 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -8,6 +8,8 @@ use crate::DeltaResult; use datafusion::prelude::*; use datafusion_common::ScalarValue; +pub const CDC_COLUMN_NAME: &str = "_change_type"; + /// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files /// associated with commits pub(crate) struct CDCTracker { @@ -88,15 +90,12 @@ mod tests { use crate::operations::DeltaOps; use crate::{DeltaConfigKey, DeltaTable}; use arrow::array::{ArrayRef, Int32Array, StructArray}; - use arrow::datatypes::{DataType, Field, SchemaRef}; + use arrow::datatypes::{DataType, Field}; use arrow_array::RecordBatch; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::Schema; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::{MemTable, TableProvider}; - use std::sync::Arc; - use tracing::log::*; - /// A simple test which validates primitive writer version 1 tables should /// not write Change Data Files #[tokio::test] diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 8ddd948d35..a2bbaca0c9 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -43,7 +43,6 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ @@ -52,7 +51,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::{write_execution_plan, write_execution_plan_cdc}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaTable, DeltaTableError}; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 1bfa595fbd..e7b5e43ebc 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1447,12 +1447,51 @@ async fn execute( .clone() .filter(col(DELETE_COLUMN))? .select(write_projection.clone())? - .with_column("_change_type", lit("delete"))?, + .with_column(crate::operations::cdc::CDC_COLUMN_NAME, lit("delete"))?, ); } let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?; + if should_cdc { + debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets"); + let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq( + // This is a copy operation, but I'm not sure how to turn that enum into an int + lit(5), + ))?; + + change_data.push( + cdc_projection + .clone() + .filter( + col(SOURCE_COLUMN) + .is_true() + .and(col(TARGET_COLUMN).is_null()), + )? + .select(write_projection.clone())? + .with_column(CDC_COLUMN_NAME, lit("insert"))?, + ); + let before = cdc_projection + .clone() + .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? + .select( + target_schema + .columns() + .iter() + .filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN) + .map(|c| Expr::Column(c.clone())) + .collect_vec(), + )?; + + let after = cdc_projection + .clone() + .filter(col(TARGET_COLUMN).is_true())? + .select(write_projection.clone())?; + + let tracker = CDCTracker::new(before, after); + change_data.push(tracker.collect()?); + } + let project = filtered.clone().select(write_projection)?; let merge_final = &project.into_unoptimized_plan(); @@ -1483,24 +1522,31 @@ async fn execute( log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, - writer_properties, - writer_stats_config, + writer_properties.clone(), + writer_stats_config.clone(), None, ) .await?; if should_cdc && !change_data.is_empty() { + let mut df = change_data + .pop() + .expect("change_data should never be empty"); + // Accumulate all the changes together into a single data frame to produce the necessary + // change data files + for change in change_data { + df = df.union(change)?; + } add_actions.extend( write_execution_plan_cdc( Some(&snapshot), state.clone(), - change_data.pop().unwrap().create_physical_plan().await?, + df.create_physical_plan().await?, table_partition_cols.clone(), log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, - safe_cast, writer_stats_config, None, ) @@ -3562,19 +3608,45 @@ mod tests { assert_merge(table.clone(), metrics).await; - // Just checking that the data wasn't actually written instead! - if let Ok(files) = crate::storage::utils::flatten_list_stream( - &table.object_store(), - Some(&object_store::path::Path::from("_change_data")), + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, ) .await - { - assert_eq!( - 1, - files.len(), - "This test should find written CDC files! {files:#?}" - ); - } + .expect("Failed to collect batches"); + + let _ = arrow::util::pretty::print_batches(&batches); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect(); + + assert_batches_sorted_eq! {[ + "+----+-------+------------+------------------+-----------------+", + "| id | value | modified | _change_type | _commit_version |", + "+----+-------+------------+------------------+-----------------+", + "| A | 1 | 2021-02-01 | update_preimage | 2 |", + "| A | 2 | 2021-02-01 | update_postimage | 2 |", + "| B | 10 | 2021-02-01 | update_preimage | 2 |", + "| B | 10 | 2021-02-02 | update_postimage | 2 |", + "| C | 10 | 2021-02-02 | update_preimage | 2 |", + "| C | 20 | 2023-07-04 | update_postimage | 2 |", + "| X | 30 | 2023-07-04 | insert | 2 |", + "| A | 1 | 2021-02-01 | insert | 1 |", + "| B | 10 | 2021-02-01 | insert | 1 |", + "| C | 10 | 2021-02-02 | insert | 1 |", + "| D | 100 | 2021-02-02 | insert | 1 |", + "+----+-------+------------+------------------+-----------------+", + ], &batches } } #[tokio::test] @@ -3626,20 +3698,6 @@ mod tests { let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); - // Just checking that the data wasn't actually written instead! - if let Ok(files) = crate::storage::utils::flatten_list_stream( - &table.object_store(), - Some(&object_store::path::Path::from("_change_data")), - ) - .await - { - assert_eq!( - 1, - files.len(), - "This test should find written CDC files! {files:#?}" - ); - } - let ctx = SessionContext::new(); let table = DeltaOps(table) .load_cdf()