Skip to content

Commit

Permalink
refactor: cdf merge, keep all ops in one single df
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
  • Loading branch information
ion-elgreco authored and rtyler committed Jan 20, 2025
1 parent 6f5cc26 commit 0cf4ff4
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 84 deletions.
145 changes: 64 additions & 81 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;

use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
Expand All @@ -56,7 +57,6 @@ use datafusion_expr::{

use filter::try_construct_early_filter;
use futures::future::BoxFuture;
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use tracing::log::*;
Expand Down Expand Up @@ -719,7 +719,6 @@ async fn execute(
// be disabled in the common case(s)
let should_cdc = should_write_cdc(&snapshot)?;
// Change data may be collected and then written out at the completion of the merge
let mut change_data = vec![];

if should_cdc {
debug!("Executing a merge and I should write CDC!");
Expand Down Expand Up @@ -790,7 +789,7 @@ async fn execute(
fn add_generated_columns(
mut df: DataFrame,
generated_cols: &Vec<GeneratedColumn>,
generated_cols_missing_in_source: &Vec<String>,
generated_cols_missing_in_source: &[String],
state: &SessionState,
) -> DeltaResult<DataFrame> {
debug!("Generating columns in dataframe");
Expand Down Expand Up @@ -1049,6 +1048,7 @@ async fn execute(

let mut new_columns = vec![];
let mut write_projection = Vec::new();
let mut write_projection_with_cdf = Vec::new();

for delta_field in snapshot.schema().fields() {
let mut when_expr = Vec::with_capacity(operations_size);
Expand Down Expand Up @@ -1086,9 +1086,26 @@ async fn execute(
Expr::Column(Column::from_name(name.clone())).alias(delta_field.name()),
delta_field.data_type().try_into()?,
));

write_projection_with_cdf.push(
when(
col(CDC_COLUMN_NAME).not_eq(lit("update_preimage")),
cast(
Expr::Column(Column::from_name(name.clone())),
delta_field.data_type().try_into()?,
),
)
.otherwise(cast(
Expr::Column(Column::new(Some(target_name.clone()), delta_field.name())), // We take the column from target table
delta_field.data_type().try_into()?,
))?
.alias(delta_field.name()),
);
new_columns.push((name, case));
}

write_projection_with_cdf.push(col("_change_type"));

let mut insert_when = Vec::with_capacity(ops.len());
let mut insert_then = Vec::with_capacity(ops.len());

Expand Down Expand Up @@ -1219,93 +1236,59 @@ async fn execute(

let operation_count = DataFrame::new(state.clone(), operation_count);

if should_cdc {
// Create a dataframe containing the CDC deletes which are present at this point
change_data.push(
operation_count
.clone()
.filter(col(DELETE_COLUMN))?
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("delete"))?,
);
}

let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?;

if should_cdc {
debug!("The merge should triggere a CDC tracking, computing pre/insert/postimage datasets");
let cdc_projection = filtered.clone().filter(col(OPERATION_COLUMN).not_eq(
// This is a copy operation, but I'm not sure how to turn that enum into an int
lit(5),
))?;

let cdc_insert_df = cdc_projection
let mut projected = if should_cdc {
operation_count
.clone()
.filter(
col(SOURCE_COLUMN)
.is_true()
.and(col(TARGET_COLUMN).is_null()),
.with_column(
CDC_COLUMN_NAME,
when(col(TARGET_DELETE_COLUMN).is_null(), lit("delete")) // nulls are equal to True
.when(col(DELETE_COLUMN).is_null(), lit("source_delete"))
.when(col(TARGET_COPY_COLUMN).is_null(), lit("copy"))
.when(col(TARGET_INSERT_COLUMN).is_null(), lit("insert"))
.when(col(TARGET_UPDATE_COLUMN).is_null(), lit("update"))
.end()?,
)?
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?;

change_data.push(cdc_insert_df);

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

// Extra select_columns is required so that before and after have same schema order
// DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650
let before = cdc_projection
.clone()
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
.select(
target_schema
.columns()
.iter()
.filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN)
.map(|c| Expr::Column(c.clone()))
.collect_vec(),
.drop_columns(&["__delta_rs_path"])? // WEIRD bug caused by interaction with unnest_columns, has to be dropped otherwise throws schema error
.with_column(
"__delta_rs_update_expanded",
when(
col(CDC_COLUMN_NAME).eq(lit("update")),
lit(ScalarValue::List(ScalarValue::new_list(
&[
ScalarValue::Utf8(Some("update_preimage".into())),
ScalarValue::Utf8(Some("update_postimage".into())),
],
&DataType::List(Field::new("element", DataType::Utf8, false).into()),
true,
))),
)
.end()?,
)?
.select_columns(
&after
.schema()
.columns()
.iter()
.map(|v| v.name())
.collect::<Vec<_>>(),
)?;

let tracker = CDCTracker::new(before, after);
change_data.push(tracker.collect()?);
}

let mut project = filtered.clone().select(write_projection)?;

if should_cdc && !change_data.is_empty() {
let mut df = change_data
.pop()
.expect("change_data should never be empty");
// Accumulate all the changes together into a single data frame to produce the necessary
// change data files
for change in change_data {
df = df.union(change)?;
}
project = project
.with_column(CDC_COLUMN_NAME, lit(ScalarValue::Null))?
.union(df)?;
}
.unnest_columns(&["__delta_rs_update_expanded"])?
.with_column(
CDC_COLUMN_NAME,
when(
col(CDC_COLUMN_NAME).eq(lit("update")),
col("__delta_rs_update_expanded"),
)
.otherwise(col(CDC_COLUMN_NAME))?,
)?
.drop_columns(&["__delta_rs_update_expanded"])?
.select(write_projection_with_cdf)?
} else {
operation_count
.filter(col(DELETE_COLUMN).is_false())?
.select(write_projection)?
};

project = add_generated_columns(
project,
projected = add_generated_columns(
projected,
&generated_col_expressions,
&missing_generated_columns,
&state,
)?;

let merge_final = &project.into_unoptimized_plan();
let merge_final = &projected.into_unoptimized_plan();
let write = state.create_physical_plan(merge_final).await?;

let err = || DeltaTableError::Generic("Unable to locate expected metric node".into());
Expand Down
18 changes: 15 additions & 3 deletions crates/core/src/operations/merge/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow_schema::{Schema, SchemaRef as ArrowSchemaRef};
use datafusion::catalog::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion_expr::col;
use datafusion_expr::{col, lit};
use datafusion_physical_plan::ExecutionPlan;
use futures::StreamExt;
use object_store::prefix::PrefixStore;
Expand Down Expand Up @@ -170,9 +170,21 @@ pub(crate) async fn write_execution_plan_v2(

let normal_df = batch_df
.clone()
.filter(col(CDC_COLUMN_NAME).is_null())?
.filter(col(CDC_COLUMN_NAME).in_list(
vec![lit("delete"), lit("source_delete"), lit("update_preimage")],
true,
))?
.drop_columns(&[CDC_COLUMN_NAME])?;
let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).is_not_null())?;

let cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).in_list(
vec![
lit("delete"),
lit("insert"),
lit("update_preimage"),
lit("update_postimage"),
],
false,
))?;

let normal_batch =
concat_batches(&write_schema, &normal_df.collect().await?)?;
Expand Down

0 comments on commit 0cf4ff4

Please sign in to comment.