diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 15fea91ee9..419ecb0ade 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -249,6 +249,17 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); + let mut app_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new(), + }; + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + if let Ok(map) = serde_json::to_value(&metrics) { + app_metadata.insert("operationMetrics".to_owned(), map); + } + // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Delete { @@ -259,7 +270,7 @@ async fn execute( &actions, operation, snapshot, - app_metadata, + Some(app_metadata), ) .await?; } @@ -390,7 +401,7 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); - let (table, metrics) = DeltaOps(table).delete().await.unwrap(); + let (mut table, metrics) = DeltaOps(table).delete().await.unwrap(); assert_eq!(table.version(), 2); assert_eq!(table.get_file_uris().count(), 0); @@ -399,6 +410,14 @@ mod tests { assert_eq!(metrics.num_deleted_rows, None); assert_eq!(metrics.num_copied_rows, None); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(&metrics).unwrap() + ); + // rewrite is not required assert_eq!(metrics.rewrite_time_ms, 0); diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index a97cd4a402..ca26b2d3ff 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -154,6 +154,17 @@ impl FileSystemCheckPlan { default_row_commit_version: file.default_row_commit_version, })); } + let metrics = FileSystemCheckMetrics { + dry_run: false, + files_removed: removed_file_paths, + }; + + let mut app_metadata = HashMap::new(); + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + if let Ok(map) = serde_json::to_value(&metrics) { + app_metadata.insert("operationMetrics".to_owned(), map); + } commit( self.log_store.as_ref(), @@ -161,14 +172,11 @@ impl FileSystemCheckPlan { DeltaOperation::FileSystemCheck {}, snapshot, // TODO pass through metadata - None, + Some(app_metadata), ) .await?; - Ok(FileSystemCheckMetrics { - dry_run: false, - files_removed: removed_file_paths, - }) + Ok(metrics) } } diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 8791c96e4d..419f2ceee2 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1383,6 +1383,17 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; + let mut app_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new(), + }; + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + if let Ok(map) = serde_json::to_value(&metrics) { + app_metadata.insert("operationMetrics".to_owned(), map); + } + // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Merge { @@ -1396,7 +1407,7 @@ async fn execute( &actions, operation, snapshot, - app_metadata, + Some(app_metadata), ) .await?; } @@ -2046,6 +2057,11 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; let parameters = last_commit.operation_parameters.clone().unwrap(); + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(&metrics).unwrap() + ); assert_eq!(parameters["predicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 78319c00b0..6d96b4e1dd 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -408,12 +408,24 @@ async fn execute( let operation = DeltaOperation::Update { predicate: Some(fmt_expr_to_sql(&predicate)?), }; + + let mut app_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new(), + }; + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + if let Ok(map) = serde_json::to_value(&metrics) { + app_metadata.insert("operationMetrics".to_owned(), map); + } + version = commit( log_store.as_ref(), &actions, operation, snapshot, - app_metadata, + Some(app_metadata), ) .await?; @@ -844,7 +856,7 @@ mod tests { // Validate order operators do not include nulls let table = prepare_values_table().await; - let (table, metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .update() .with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2)))) .with_update("value", lit(10)) @@ -857,6 +869,14 @@ mod tests { assert_eq!(metrics.num_updated_rows, 2); assert_eq!(metrics.num_copied_rows, 3); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(&metrics).unwrap() + ); + let expected = [ "+-------+", "| value |",