From 4d533ccc9ad1aed974c5c3a238a62f013686ae27 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 16:51:57 +0100 Subject: [PATCH 1/9] extend commit in merge --- .../src/operations/merge/mod.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 7cb752dc21..069bfb0895 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -553,7 +553,7 @@ impl MergeOperationConfig { } } -#[derive(Default, Serialize, Debug)] +#[derive(Default, Serialize, Clone, Debug)] /// Metrics for the Merge Operation pub struct MergeMetrics { /// Number of rows in the source data @@ -1390,6 +1390,21 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; + + let mut c_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new() + }; + + c_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + let merge_metrics = serde_json::to_value(metrics.clone()); + + if let Ok(map) = merge_metrics { + c_metadata.insert("operationMetrics".to_owned(), map); + } + + // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Merge { @@ -1403,7 +1418,7 @@ async fn execute( &actions, operation, snapshot, - app_metadata, + Some(c_metadata), ) .await?; } From 94c5366f30dcc5b5a57ee7dadba446790558019a Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 17:04:46 +0100 Subject: [PATCH 2/9] add operationMetrics in commot for other operations --- .../deltalake-core/src/operations/delete.rs | 19 +++++++++++++++-- .../src/operations/filesystem_check.rs | 21 +++++++++++++------ .../src/operations/merge/mod.rs | 8 +++---- .../deltalake-core/src/operations/update.rs | 18 ++++++++++++++-- 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 15fea91ee9..3f11c20c3e 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -63,7 +63,7 @@ pub struct DeleteBuilder { app_metadata: Option>, } -#[derive(Default, Debug, Serialize)] +#[derive(Default, Debug, Clone, Serialize)] /// Metrics for the Delete Operation pub struct DeleteMetrics { /// Number of files added @@ -249,6 +249,21 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); + + let mut app_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new() + }; + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + let delete_metrics = serde_json::to_value(metrics.clone()); + + if let Ok(map) = delete_metrics { + app_metadata.insert("operationMetrics".to_owned(), map); + } + + // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Delete { @@ -259,7 +274,7 @@ async fn execute( &actions, operation, snapshot, - app_metadata, + Some(app_metadata), ) .await?; } diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index b79f22b1f4..3ba7a24479 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -45,7 +45,7 @@ pub struct FileSystemCheckBuilder { } /// Details of the FSCK operation including which files were removed from the log -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct FileSystemCheckMetrics { /// Was this a dry run pub dry_run: bool, @@ -154,6 +154,18 @@ impl FileSystemCheckPlan { default_row_commit_version: file.default_row_commit_version, })); } + let metrics = FileSystemCheckMetrics { + dry_run: false, + files_removed: removed_file_paths, + }; + + let mut app_metadata = HashMap::new(); + let fsck_metrics = serde_json::to_value(metrics.clone()); + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + if let Ok(map) = fsck_metrics { + app_metadata.insert("operationMetrics".to_owned(), map); + } commit( self.log_store.as_ref(), @@ -161,14 +173,11 @@ impl FileSystemCheckPlan { DeltaOperation::FileSystemCheck {}, snapshot, // TODO pass through metadata - None, + Some(app_metadata), ) .await?; - Ok(FileSystemCheckMetrics { - dry_run: false, - files_removed: removed_file_paths, - }) + Ok(metrics) } } diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 069bfb0895..995ba6cadd 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1391,17 +1391,17 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; - let mut c_metadata = match app_metadata { + let mut app_metadata = match app_metadata { Some(meta) => meta, None => HashMap::new() }; - c_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); let merge_metrics = serde_json::to_value(metrics.clone()); if let Ok(map) = merge_metrics { - c_metadata.insert("operationMetrics".to_owned(), map); + app_metadata.insert("operationMetrics".to_owned(), map); } @@ -1418,7 +1418,7 @@ async fn execute( &actions, operation, snapshot, - Some(c_metadata), + Some(app_metadata), ) .await?; } diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 78319c00b0..cc51d65e10 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -78,7 +78,7 @@ pub struct UpdateBuilder { safe_cast: bool, } -#[derive(Default, Serialize, Debug)] +#[derive(Default, Clone, Serialize, Debug)] /// Metrics collected during the Update operation pub struct UpdateMetrics { /// Number of files added. @@ -408,12 +408,26 @@ async fn execute( let operation = DeltaOperation::Update { predicate: Some(fmt_expr_to_sql(&predicate)?), }; + + let mut app_metadata = match app_metadata { + Some(meta) => meta, + None => HashMap::new() + }; + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + let update_metrics = serde_json::to_value(metrics.clone()); + + if let Ok(map) = update_metrics { + app_metadata.insert("operationMetrics".to_owned(), map); + } + version = commit( log_store.as_ref(), &actions, operation, snapshot, - app_metadata, + Some(app_metadata), ) .await?; From d0402737e2451039b730d4d23771fdcd1a63ae65 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 17:07:57 +0100 Subject: [PATCH 3/9] fmt --- crates/deltalake-core/src/operations/delete.rs | 8 +++----- crates/deltalake-core/src/operations/merge/mod.rs | 8 +++----- crates/deltalake-core/src/operations/update.rs | 6 +++--- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 3f11c20c3e..37d5a99183 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -249,12 +249,11 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); - let mut app_metadata = match app_metadata { Some(meta) => meta, - None => HashMap::new() - }; - + None => HashMap::new(), + }; + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); let delete_metrics = serde_json::to_value(metrics.clone()); @@ -262,7 +261,6 @@ async fn execute( if let Ok(map) = delete_metrics { app_metadata.insert("operationMetrics".to_owned(), map); } - // Do not make a commit when there are zero updates to the state if !actions.is_empty() { diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 995ba6cadd..d1e4f00239 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1390,12 +1390,11 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; - let mut app_metadata = match app_metadata { Some(meta) => meta, - None => HashMap::new() - }; - + None => HashMap::new(), + }; + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); let merge_metrics = serde_json::to_value(metrics.clone()); @@ -1403,7 +1402,6 @@ async fn execute( if let Ok(map) = merge_metrics { app_metadata.insert("operationMetrics".to_owned(), map); } - // Do not make a commit when there are zero updates to the state if !actions.is_empty() { diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index cc51d65e10..1fad08e820 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -411,9 +411,9 @@ async fn execute( let mut app_metadata = match app_metadata { Some(meta) => meta, - None => HashMap::new() - }; - + None => HashMap::new(), + }; + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); let update_metrics = serde_json::to_value(metrics.clone()); From af6c1c1cdeb0c733fd3b2d95e99e1b05010eab6b Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 17:20:42 +0100 Subject: [PATCH 4/9] refactor the commit --- .../deltalake-core/src/operations/restore.rs | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index a10248bcb0..ccc1277adb 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -21,7 +21,7 @@ //! ```` use std::cmp::max; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::BitXor; use std::time::{SystemTime, UNIX_EPOCH}; @@ -33,7 +33,7 @@ use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; use crate::logstore::LogStoreRef; -use crate::operations::transaction::{prepare_commit, TransactionError}; +use crate::operations::transaction::commit; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -60,7 +60,7 @@ impl From for DeltaTableError { } /// Metrics from Restore -#[derive(Default, Debug, Serialize)] +#[derive(Default, Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct RestoreMetrics { /// Number of files removed @@ -238,27 +238,28 @@ async fn execute( actions.extend(files_to_add.into_iter().map(Action::Add)); actions.extend(files_to_remove.into_iter().map(Action::Remove)); - let commit = prepare_commit( - log_store.object_store().as_ref(), - &DeltaOperation::Restore { - version: version_to_restore, - datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), - }, + let mut app_metadata = HashMap::new(); + let restore_metrics = serde_json::to_value(metrics.clone()); + + app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); + + if let Ok(map) = restore_metrics { + app_metadata.insert("operationMetrics".to_owned(), map); + } + + let operation = DeltaOperation::Restore { + version: version_to_restore, + datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), + }; + + commit( + log_store.as_ref(), &actions, - None, + operation, + &snapshot, + Some(app_metadata), ) .await?; - let commit_version = snapshot.version() + 1; - match log_store.write_commit_entry(commit_version, &commit).await { - Ok(_) => {} - Err(err @ TransactionError::VersionAlreadyExists(_)) => { - return Err(err.into()); - } - Err(err) => { - log_store.object_store().delete(&commit).await?; - return Err(err.into()); - } - } Ok(metrics) } From 81ae502705aeea1cd177cb5a73706528f6b5d81a Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 17:36:30 +0100 Subject: [PATCH 5/9] remove incorrect test with the refactor --- crates/deltalake-core/tests/command_restore.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 2c1c06cbb6..9f3c498506 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -202,14 +202,3 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { assert!(result.is_ok()); Ok(()) } - -#[tokio::test] -async fn test_restore_transaction_conflict() -> Result<(), Box> { - let context = setup_test().await?; - let mut table = context.table; - table.load_version(2).await?; - - let result = DeltaOps(table).restore().with_version_to_restore(1).await; - assert!(result.is_err()); - Ok(()) -} From d8619e40184b26c88f0f34f227678d57a36f7273 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Jan 2024 18:26:29 +0100 Subject: [PATCH 6/9] extend test for roundtripping metrics --- crates/deltalake-core/src/operations/delete.rs | 10 +++++++++- crates/deltalake-core/src/operations/merge/mod.rs | 5 +++++ crates/deltalake-core/src/operations/update.rs | 10 +++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 37d5a99183..7649760d2a 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -403,7 +403,7 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); - let (table, metrics) = DeltaOps(table).delete().await.unwrap(); + let (mut table, metrics) = DeltaOps(table).delete().await.unwrap(); assert_eq!(table.version(), 2); assert_eq!(table.get_file_uris().count(), 0); @@ -412,6 +412,14 @@ mod tests { assert_eq!(metrics.num_deleted_rows, None); assert_eq!(metrics.num_copied_rows, None); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(metrics.clone()).unwrap() + ); + // rewrite is not required assert_eq!(metrics.rewrite_time_ms, 0); diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index d1e4f00239..6e42d690d7 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -2066,6 +2066,11 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; let parameters = last_commit.operation_parameters.clone().unwrap(); + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(metrics.clone()).unwrap() + ); assert_eq!(parameters["predicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 1fad08e820..57aaed97ed 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -858,7 +858,7 @@ mod tests { // Validate order operators do not include nulls let table = prepare_values_table().await; - let (table, metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .update() .with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2)))) .with_update("value", lit(10)) @@ -871,6 +871,14 @@ mod tests { assert_eq!(metrics.num_updated_rows, 2); assert_eq!(metrics.num_copied_rows, 3); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let extra_info = last_commit.info.clone(); + assert_eq!( + extra_info["operationMetrics"], + serde_json::to_value(metrics.clone()).unwrap() + ); + let expected = [ "+-------+", "| value |", From 844c9608155a8b58447119842deaa61c6834996c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 4 Jan 2024 01:25:16 +0100 Subject: [PATCH 7/9] add old lower level commit back --- .../deltalake-core/src/operations/restore.rs | 41 +++++++++---------- .../deltalake-core/tests/command_restore.rs | 11 +++++ 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index ccc1277adb..87b49b6811 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -21,7 +21,7 @@ //! ```` use std::cmp::max; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::ops::BitXor; use std::time::{SystemTime, UNIX_EPOCH}; @@ -33,7 +33,7 @@ use serde::Serialize; use crate::kernel::{Action, Add, Protocol, Remove}; use crate::logstore::LogStoreRef; -use crate::operations::transaction::commit; +use crate::operations::transaction::{prepare_commit, TransactionError}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -238,28 +238,27 @@ async fn execute( actions.extend(files_to_add.into_iter().map(Action::Add)); actions.extend(files_to_remove.into_iter().map(Action::Remove)); - let mut app_metadata = HashMap::new(); - let restore_metrics = serde_json::to_value(metrics.clone()); - - app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); - - if let Ok(map) = restore_metrics { - app_metadata.insert("operationMetrics".to_owned(), map); - } - - let operation = DeltaOperation::Restore { - version: version_to_restore, - datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), - }; - - commit( - log_store.as_ref(), + let commit = prepare_commit( + log_store.object_store().as_ref(), + &DeltaOperation::Restore { + version: version_to_restore, + datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), + }, &actions, - operation, - &snapshot, - Some(app_metadata), + None, ) .await?; + let commit_version = snapshot.version() + 1; + match log_store.write_commit_entry(commit_version, &commit).await { + Ok(_) => {} + Err(err @ TransactionError::VersionAlreadyExists(_)) => { + return Err(err.into()); + } + Err(err) => { + log_store.object_store().delete(&commit).await?; + return Err(err.into()); + } + } Ok(metrics) } diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 7a64434b93..9b77662ce4 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -209,3 +209,14 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { assert!(result.is_ok()); Ok(()) } + +#[tokio::test] +async fn test_restore_transaction_conflict() -> Result<(), Box> { + let context = setup_test().await?; + let mut table = context.table; + table.load_version(2).await?; + + let result = DeltaOps(table).restore().with_version_to_restore(1).await; + assert!(result.is_err()); + Ok(()) +} From 647ef387bea47aaf39addadd3b875eb1e9865272 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 4 Jan 2024 12:20:58 +0100 Subject: [PATCH 8/9] Update crates/deltalake-core/src/operations/delete.rs Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- crates/deltalake-core/src/operations/delete.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 7649760d2a..99af37aa0f 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -256,9 +256,7 @@ async fn execute( app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); - let delete_metrics = serde_json::to_value(metrics.clone()); - - if let Ok(map) = delete_metrics { + if let Ok(map) = serde_json::to_value(&metrics) { app_metadata.insert("operationMetrics".to_owned(), map); } From 943ce677657ba8fa696ca9210ad0e8310fb12528 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 4 Jan 2024 12:26:37 +0100 Subject: [PATCH 9/9] use reference --- crates/deltalake-core/src/operations/delete.rs | 4 ++-- crates/deltalake-core/src/operations/filesystem_check.rs | 5 ++--- crates/deltalake-core/src/operations/merge/mod.rs | 8 +++----- crates/deltalake-core/src/operations/restore.rs | 2 +- crates/deltalake-core/src/operations/update.rs | 8 +++----- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 99af37aa0f..419ecb0ade 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -63,7 +63,7 @@ pub struct DeleteBuilder { app_metadata: Option>, } -#[derive(Default, Debug, Clone, Serialize)] +#[derive(Default, Debug, Serialize)] /// Metrics for the Delete Operation pub struct DeleteMetrics { /// Number of files added @@ -415,7 +415,7 @@ mod tests { let extra_info = last_commit.info.clone(); assert_eq!( extra_info["operationMetrics"], - serde_json::to_value(metrics.clone()).unwrap() + serde_json::to_value(&metrics).unwrap() ); // rewrite is not required diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index b3ecef1a45..ca26b2d3ff 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -45,7 +45,7 @@ pub struct FileSystemCheckBuilder { } /// Details of the FSCK operation including which files were removed from the log -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Serialize)] pub struct FileSystemCheckMetrics { /// Was this a dry run pub dry_run: bool, @@ -160,10 +160,9 @@ impl FileSystemCheckPlan { }; let mut app_metadata = HashMap::new(); - let fsck_metrics = serde_json::to_value(metrics.clone()); app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); - if let Ok(map) = fsck_metrics { + if let Ok(map) = serde_json::to_value(&metrics) { app_metadata.insert("operationMetrics".to_owned(), map); } diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 6dd2dbf04b..419f2ceee2 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -553,7 +553,7 @@ impl MergeOperationConfig { } } -#[derive(Default, Serialize, Clone, Debug)] +#[derive(Default, Serialize, Debug)] /// Metrics for the Merge Operation pub struct MergeMetrics { /// Number of rows in the source data @@ -1390,9 +1390,7 @@ async fn execute( app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); - let merge_metrics = serde_json::to_value(metrics.clone()); - - if let Ok(map) = merge_metrics { + if let Ok(map) = serde_json::to_value(&metrics) { app_metadata.insert("operationMetrics".to_owned(), map); } @@ -2062,7 +2060,7 @@ mod tests { let extra_info = last_commit.info.clone(); assert_eq!( extra_info["operationMetrics"], - serde_json::to_value(metrics.clone()).unwrap() + serde_json::to_value(&metrics).unwrap() ); assert_eq!(parameters["predicate"], json!("target.id = source.id")); assert_eq!( diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index 87b49b6811..a10248bcb0 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -60,7 +60,7 @@ impl From for DeltaTableError { } /// Metrics from Restore -#[derive(Default, Debug, Clone, Serialize)] +#[derive(Default, Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct RestoreMetrics { /// Number of files removed diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 57aaed97ed..6d96b4e1dd 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -78,7 +78,7 @@ pub struct UpdateBuilder { safe_cast: bool, } -#[derive(Default, Clone, Serialize, Debug)] +#[derive(Default, Serialize, Debug)] /// Metrics collected during the Update operation pub struct UpdateMetrics { /// Number of files added. @@ -416,9 +416,7 @@ async fn execute( app_metadata.insert("readVersion".to_owned(), snapshot.version().into()); - let update_metrics = serde_json::to_value(metrics.clone()); - - if let Ok(map) = update_metrics { + if let Ok(map) = serde_json::to_value(&metrics) { app_metadata.insert("operationMetrics".to_owned(), map); } @@ -876,7 +874,7 @@ mod tests { let extra_info = last_commit.info.clone(); assert_eq!( extra_info["operationMetrics"], - serde_json::to_value(metrics.clone()).unwrap() + serde_json::to_value(&metrics).unwrap() ); let expected = [