Skip to content

Commit

Permalink
feat: introduce change data capture support for merge operations
Browse files Browse the repository at this point in the history
This does not have as much testing as it probably should, but
preliminary results are looking good!
  • Loading branch information
rtyler committed Aug 12, 2024
1 parent 4dae148 commit 747603f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";

pub mod cdf;
pub mod expr;
Expand Down
9 changes: 4 additions & 5 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::DeltaResult;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;

pub const CDC_COLUMN_NAME: &str = "_change_type";

/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
Expand Down Expand Up @@ -88,15 +90,12 @@ mod tests {
use crate::operations::DeltaOps;
use crate::{DeltaConfigKey, DeltaTable};
use arrow::array::{ArrayRef, Int32Array, StructArray};
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::Schema;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::{MemTable, TableProvider};

use std::sync::Arc;
use tracing::log::*;

/// A simple test which validates primitive writer version 1 tables should
/// not write Change Data Files
#[tokio::test]
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::WriterStatsConfig;

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
Expand All @@ -52,7 +51,7 @@ use crate::delta_datafusion::{
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaTable, DeltaTableError};
Expand Down
118 changes: 88 additions & 30 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,12 +1447,51 @@ async fn execute(
.clone()
.filter(col(DELETE_COLUMN))?
.select(write_projection.clone())?
.with_column("_change_type", lit("delete"))?,
.with_column(crate::operations::cdc::CDC_COLUMN_NAME, lit("delete"))?,
);
}

let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?;

if should_cdc {
debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets");
let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq(
// This is a copy operation, but I'm not sure how to turn that enum into an int
lit(5),
))?;

change_data.push(
cdc_projection
.clone()
.filter(
col(SOURCE_COLUMN)
.is_true()
.and(col(TARGET_COLUMN).is_null()),
)?
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?,
);
let before = cdc_projection
.clone()
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
.select(
target_schema
.columns()
.iter()
.filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN)
.map(|c| Expr::Column(c.clone()))
.collect_vec(),
)?;

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

let tracker = CDCTracker::new(before, after);
change_data.push(tracker.collect()?);
}

let project = filtered.clone().select(write_projection)?;

let merge_final = &project.into_unoptimized_plan();
Expand Down Expand Up @@ -1483,24 +1522,31 @@ async fn execute(
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
writer_stats_config,
writer_properties.clone(),
writer_stats_config.clone(),
None,
)
.await?;

if should_cdc && !change_data.is_empty() {
let mut df = change_data
.pop()
.expect("change_data should never be empty");
// Accumulate all the changes together into a single data frame to produce the necessary
// change data files
for change in change_data {
df = df.union(change)?;
}
add_actions.extend(
write_execution_plan_cdc(
Some(&snapshot),
state.clone(),
change_data.pop().unwrap().create_physical_plan().await?,
df.create_physical_plan().await?,
table_partition_cols.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
safe_cast,
writer_stats_config,
None,
)
Expand Down Expand Up @@ -3562,19 +3608,45 @@ mod tests {

assert_merge(table.clone(), metrics).await;

// Just checking that the data wasn't actually written instead!
if let Ok(files) = crate::storage::utils::flatten_list_stream(
&table.object_store(),
Some(&object_store::path::Path::from("_change_data")),
let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
.with_session_ctx(ctx.clone())
.with_starting_version(0)
.build()
.await
.expect("Failed to load CDF");

let mut batches = collect_batches(
table.properties().output_partitioning().partition_count(),
table,
ctx,
)
.await
{
assert_eq!(
1,
files.len(),
"This test should find written CDC files! {files:#?}"
);
}
.expect("Failed to collect batches");

let _ = arrow::util::pretty::print_batches(&batches);

// The batches will contain a current _commit_timestamp which shouldn't be check_append_only
let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();

assert_batches_sorted_eq! {[
"+----+-------+------------+------------------+-----------------+",
"| id | value | modified | _change_type | _commit_version |",
"+----+-------+------------+------------------+-----------------+",
"| A | 1 | 2021-02-01 | update_preimage | 2 |",
"| A | 2 | 2021-02-01 | update_postimage | 2 |",
"| B | 10 | 2021-02-01 | update_preimage | 2 |",
"| B | 10 | 2021-02-02 | update_postimage | 2 |",
"| C | 10 | 2021-02-02 | update_preimage | 2 |",
"| C | 20 | 2023-07-04 | update_postimage | 2 |",
"| X | 30 | 2023-07-04 | insert | 2 |",
"| A | 1 | 2021-02-01 | insert | 1 |",
"| B | 10 | 2021-02-01 | insert | 1 |",
"| C | 10 | 2021-02-02 | insert | 1 |",
"| D | 100 | 2021-02-02 | insert | 1 |",
"+----+-------+------------+------------------+-----------------+",
], &batches }
}

#[tokio::test]
Expand Down Expand Up @@ -3626,20 +3698,6 @@ mod tests {
let actual = get_data(&table).await;
assert_batches_sorted_eq!(&expected, &actual);

// Just checking that the data wasn't actually written instead!
if let Ok(files) = crate::storage::utils::flatten_list_stream(
&table.object_store(),
Some(&object_store::path::Path::from("_change_data")),
)
.await
{
assert_eq!(
1,
files.len(),
"This test should find written CDC files! {files:#?}"
);
}

let ctx = SessionContext::new();
let table = DeltaOps(table)
.load_cdf()
Expand Down

0 comments on commit 747603f

Please sign in to comment.