diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 68ae2c164c..b08273164a 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -86,7 +86,7 @@ use crate::table::state::DeltaTableState; use crate::table::Constraint; use crate::{open_table, open_table_with_storage_options, DeltaTable}; -const PATH_COLUMN: &str = "__delta_rs_path"; +pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; pub mod cdf; pub mod expr; diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index f95b32ea75..6baf517d15 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -8,6 +8,8 @@ use crate::DeltaResult; use datafusion::prelude::*; use datafusion_common::ScalarValue; +pub const CDC_COLUMN_NAME: &str = "_change_type"; + /// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files /// associated with commits pub(crate) struct CDCTracker { @@ -88,8 +90,9 @@ mod tests { use crate::operations::DeltaOps; use crate::{DeltaConfigKey, DeltaTable}; use arrow::array::{ArrayRef, Int32Array, StructArray}; + use arrow::datatypes::{DataType, Field}; use arrow_array::RecordBatch; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::Schema; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::{MemTable, TableProvider}; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 8ddd948d35..a2bbaca0c9 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -43,7 +43,6 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ @@ -52,7 +51,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::{write_execution_plan, write_execution_plan_cdc}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaTable, DeltaTableError}; diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 57542ab668..7baf5f19fd 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -7,6 +7,7 @@ use datafusion_physical_expr::{ use std::sync::Arc; use std::time::SystemTime; +use arrow::record_batch::RecordBatch; use arrow_schema::{ArrowError, Field}; use chrono::{DateTime, Utc}; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -329,37 +330,34 @@ impl CdfLoadBuilder { } } +/// Helper function to collect batches associated with reading CDF data +pub(crate) async fn collect_batches( + num_partitions: usize, + stream: DeltaCdfScan, + ctx: SessionContext, +) -> Result, Box> { + let mut batches = vec![]; + for p in 0..num_partitions { + let data: Vec = + crate::operations::collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; + batches.extend_from_slice(&data); + } + Ok(batches) +} + #[cfg(test)] mod tests { use super::*; - use std::error::Error; use std::str::FromStr; - use arrow_array::RecordBatch; use chrono::NaiveDateTime; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::assert_batches_sorted_eq; - use crate::delta_datafusion::cdf::DeltaCdfScan; - use crate::operations::collect_sendable_stream; use crate::writer::test_utils::TestResult; use crate::DeltaOps; - async fn collect_batches( - num_partitions: usize, - stream: DeltaCdfScan, - ctx: SessionContext, - ) -> Result, Box> { - let mut batches = vec![]; - for p in 0..num_partitions { - let data: Vec = - collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; - batches.extend_from_slice(&data); - } - Ok(batches) - } - #[tokio::test] async fn test_load_local() -> TestResult { let ctx = SessionContext::new(); diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 1699011f51..e7b5e43ebc 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -59,6 +59,7 @@ use futures::future::BoxFuture; use itertools::Itertools; use parquet::file::properties::WriterProperties; use serde::Serialize; +use tracing::log::*; use self::barrier::{MergeBarrier, MergeBarrierExec}; @@ -74,9 +75,10 @@ use crate::delta_datafusion::{ }; use crate::kernel::Action; use crate::logstore::LogStoreRef; +use crate::operations::cdc::*; use crate::operations::merge::barrier::find_barrier_node; use crate::operations::transaction::CommitBuilder; -use crate::operations::write::{write_execution_plan, WriterStatsConfig}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -1014,6 +1016,15 @@ async fn execute( ) -> DeltaResult<(DeltaTableState, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); + // Determining whether we should write change data once so that computation of change data can + // be disabled in the common case(s) + let should_cdc = should_write_cdc(&snapshot)?; + // Change data may be collected and then written out at the completion of the merge + let mut change_data = vec![]; + + if should_cdc { + debug!("Executing a merge and I should write CDC!"); + } let current_metadata = snapshot.metadata(); let merge_planner = DeltaPlanner:: { @@ -1076,6 +1087,7 @@ async fn execute( let source_schema = source.schema(); let target_schema = target.schema(); let join_schema_df = build_join_schema(source_schema, target_schema, &JoinType::Full)?; + let predicate = match predicate { Expression::DataFusion(expr) => expr, Expression::String(s) => parse_predicate_expression(&join_schema_df, s, &state)?, @@ -1119,7 +1131,7 @@ async fn execute( None => LogicalPlanBuilder::scan(target_name.clone(), target_provider, None)?.build()?, }; - let source = DataFrame::new(state.clone(), source); + let source = DataFrame::new(state.clone(), source.clone()); let source = source.with_column(SOURCE_COLUMN, lit(true))?; // Not match operations imply a full scan of the target table is required @@ -1412,7 +1424,7 @@ async fn execute( let merge_barrier = LogicalPlan::Extension(Extension { node: Arc::new(MergeBarrier { - input: new_columns, + input: new_columns.clone(), expr: distrbute_expr, file_column, }), @@ -1427,11 +1439,62 @@ async fn execute( }); let operation_count = DataFrame::new(state.clone(), operation_count); + + if should_cdc { + // Create a dataframe containing the CDC deletes which are present at this point + change_data.push( + operation_count + .clone() + .filter(col(DELETE_COLUMN))? + .select(write_projection.clone())? + .with_column(crate::operations::cdc::CDC_COLUMN_NAME, lit("delete"))?, + ); + } + let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?; - let project = filtered.select(write_projection)?; - let merge_final = &project.into_unoptimized_plan(); + if should_cdc { + debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets"); + let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq( + // This is a copy operation, but I'm not sure how to turn that enum into an int + lit(5), + ))?; + + change_data.push( + cdc_projection + .clone() + .filter( + col(SOURCE_COLUMN) + .is_true() + .and(col(TARGET_COLUMN).is_null()), + )? + .select(write_projection.clone())? + .with_column(CDC_COLUMN_NAME, lit("insert"))?, + ); + let before = cdc_projection + .clone() + .filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())? + .select( + target_schema + .columns() + .iter() + .filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN) + .map(|c| Expr::Column(c.clone())) + .collect_vec(), + )?; + let after = cdc_projection + .clone() + .filter(col(TARGET_COLUMN).is_true())? + .select(write_projection.clone())?; + + let tracker = CDCTracker::new(before, after); + change_data.push(tracker.collect()?); + } + + let project = filtered.clone().select(write_projection)?; + + let merge_final = &project.into_unoptimized_plan(); let write = state.create_physical_plan(merge_final).await?; let err = || DeltaTableError::Generic("Unable to locate expected metric node".into()); @@ -1451,7 +1514,7 @@ async fn execute( ); let rewrite_start = Instant::now(); - let add_actions = write_execution_plan( + let mut add_actions = write_execution_plan( Some(&snapshot), state.clone(), write, @@ -1459,12 +1522,38 @@ async fn execute( log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, - writer_properties, - writer_stats_config, + writer_properties.clone(), + writer_stats_config.clone(), None, ) .await?; + if should_cdc && !change_data.is_empty() { + let mut df = change_data + .pop() + .expect("change_data should never be empty"); + // Accumulate all the changes together into a single data frame to produce the necessary + // change data files + for change in change_data { + df = df.union(change)?; + } + add_actions.extend( + write_execution_plan_cdc( + Some(&snapshot), + state.clone(), + df.create_physical_plan().await?, + table_partition_cols.clone(), + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + writer_stats_config, + None, + ) + .await?, + ); + } + metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64; let mut actions: Vec = add_actions.clone(); @@ -1604,6 +1693,7 @@ mod tests { use crate::kernel::DataType; use crate::kernel::PrimitiveType; use crate::kernel::StructField; + use crate::operations::load_cdf::collect_batches; use crate::operations::merge::generalize_filter; use crate::operations::merge::try_construct_early_filter; use crate::operations::DeltaOps; @@ -1620,8 +1710,8 @@ mod tests { use arrow_schema::Field; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::provider_as_source; - use datafusion::prelude::DataFrame; - use datafusion::prelude::SessionContext; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::prelude::*; use datafusion_common::Column; use datafusion_common::ScalarValue; use datafusion_common::TableReference; @@ -3416,4 +3506,230 @@ mod tests { let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); } + + #[tokio::test] + async fn test_merge_cdc_disabled() { + let (table, source) = setup().await; + + let (table, metrics) = DeltaOps(table) + .merge(source, col("target.id").eq(col("source.id"))) + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("value", col("source.value")) + .update("modified", col("source.modified")) + }) + .unwrap() + .when_not_matched_by_source_update(|update| { + update + .predicate(col("target.value").eq(lit(1))) + .update("value", col("target.value") + lit(1)) + }) + .unwrap() + .when_not_matched_insert(|insert| { + insert + .set("id", col("source.id")) + .set("value", col("source.value")) + .set("modified", col("source.modified")) + }) + .unwrap() + .await + .unwrap(); + + assert_merge(table.clone(), metrics).await; + + // Just checking that the data wasn't actually written instead! + if let Ok(files) = crate::storage::utils::flatten_list_stream( + &table.object_store(), + Some(&object_store::path::Path::from("_change_data")), + ) + .await + { + assert_eq!( + 0, + files.len(), + "This test should not find any written CDC files! {files:#?}" + ); + } + } + + #[tokio::test] + async fn test_merge_cdc_enabled_simple() { + // Manually creating the desired table with the right minimum CDC features + use crate::kernel::Protocol; + use crate::operations::merge::Action; + + let _ = pretty_env_logger::try_init(); + let schema = get_delta_schema(); + + let actions = vec![Action::Protocol(Protocol::new(1, 4))]; + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = get_arrow_schema(&None); + let table = write_data(table, &schema).await; + + assert_eq!(table.version(), 1); + assert_eq!(table.get_files_count(), 1); + let source = merge_source(schema); + + let (table, metrics) = DeltaOps(table) + .merge(source, col("target.id").eq(col("source.id"))) + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("value", col("source.value")) + .update("modified", col("source.modified")) + }) + .unwrap() + .when_not_matched_by_source_update(|update| { + update + .predicate(col("target.value").eq(lit(1))) + .update("value", col("target.value") + lit(1)) + }) + .unwrap() + .when_not_matched_insert(|insert| { + insert + .set("id", col("source.id")) + .set("value", col("source.value")) + .set("modified", col("source.modified")) + }) + .unwrap() + .await + .unwrap(); + + assert_merge(table.clone(), metrics).await; + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + let _ = arrow::util::pretty::print_batches(&batches); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect(); + + assert_batches_sorted_eq! {[ + "+----+-------+------------+------------------+-----------------+", + "| id | value | modified | _change_type | _commit_version |", + "+----+-------+------------+------------------+-----------------+", + "| A | 1 | 2021-02-01 | update_preimage | 2 |", + "| A | 2 | 2021-02-01 | update_postimage | 2 |", + "| B | 10 | 2021-02-01 | update_preimage | 2 |", + "| B | 10 | 2021-02-02 | update_postimage | 2 |", + "| C | 10 | 2021-02-02 | update_preimage | 2 |", + "| C | 20 | 2023-07-04 | update_postimage | 2 |", + "| X | 30 | 2023-07-04 | insert | 2 |", + "| A | 1 | 2021-02-01 | insert | 1 |", + "| B | 10 | 2021-02-01 | insert | 1 |", + "| C | 10 | 2021-02-02 | insert | 1 |", + "| D | 100 | 2021-02-02 | insert | 1 |", + "+----+-------+------------+------------------+-----------------+", + ], &batches } + } + + #[tokio::test] + async fn test_merge_cdc_enabled_delete() { + // Manually creating the desired table with the right minimum CDC features + use crate::kernel::Protocol; + use crate::operations::merge::Action; + + let _ = pretty_env_logger::try_init(); + let schema = get_delta_schema(); + + let actions = vec![Action::Protocol(Protocol::new(1, 4))]; + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = get_arrow_schema(&None); + let table = write_data(table, &schema).await; + + assert_eq!(table.version(), 1); + assert_eq!(table.get_files_count(), 1); + let source = merge_source(schema); + + let (table, _metrics) = DeltaOps(table) + .merge(source, col("target.id").eq(col("source.id"))) + .with_source_alias("source") + .with_target_alias("target") + .when_not_matched_by_source_delete(|delete| { + delete.predicate(col("target.modified").gt(lit("2021-02-01"))) + }) + .unwrap() + .await + .unwrap(); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 1 | 2021-02-01 |", + "| B | 10 | 2021-02-01 |", + "| C | 10 | 2021-02-02 |", + "+----+-------+------------+", + ]; + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + let _ = arrow::util::pretty::print_batches(&batches); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect(); + + assert_batches_sorted_eq! {[ + "+----+-------+------------+--------------+-----------------+", + "| id | value | modified | _change_type | _commit_version |", + "+----+-------+------------+--------------+-----------------+", + "| D | 100 | 2021-02-02 | delete | 2 |", + "| A | 1 | 2021-02-01 | insert | 1 |", + "| B | 10 | 2021-02-01 | insert | 1 |", + "| C | 10 | 2021-02-02 | insert | 1 |", + "| D | 100 | 2021-02-02 | insert | 1 |", + "+----+-------+------------+--------------+-----------------+", + ], &batches } + } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 37837ed60a..60b9bfd160 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -484,18 +484,16 @@ impl std::future::IntoFuture for UpdateBuilder { mod tests { use super::*; - use crate::delta_datafusion::cdf::DeltaCdfScan; use crate::kernel::DataType as DeltaDataType; use crate::kernel::{Action, PrimitiveType, Protocol, StructField, StructType}; - use crate::operations::collect_sendable_stream; + use crate::operations::load_cdf::*; use crate::operations::DeltaOps; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::datafusion::write_batch; use crate::writer::test_utils::{ get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, }; - use crate::DeltaConfigKey; - use crate::DeltaTable; + use crate::{DeltaConfigKey, DeltaTable}; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::Schema as ArrowSchema; use arrow::datatypes::{Field, Schema}; @@ -1212,18 +1210,4 @@ mod tests { "+-------+------------------+-----------------+------+", ], &batches } } - - async fn collect_batches( - num_partitions: usize, - stream: DeltaCdfScan, - ctx: SessionContext, - ) -> Result, Box> { - let mut batches = vec![]; - for p in 0..num_partitions { - let data: Vec = - collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; - batches.extend_from_slice(&data); - } - Ok(batches) - } }