From 1e8d665a842799b778baedaa578902daaa04a704 Mon Sep 17 00:00:00 2001 From: Ilya Moshkov Date: Sun, 8 Jan 2023 23:06:36 +0400 Subject: [PATCH 1/3] save operational params in the same way with delta io --- rust/src/action/mod.rs | 7 ++++++- rust/tests/command_optimize.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 10a1528084..7e70778178 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -509,9 +509,14 @@ impl DeltaOperation { ); if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) { + let all_operation_fields = map.values().next().unwrap().as_object().unwrap(); + let converted_operation_fields: Map = all_operation_fields.iter() + .filter(|item| !item.1.is_null()) + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))).collect(); + commit_info.insert( "operationParameters".to_string(), - map.values().next().unwrap().clone(), + serde_json::Value::Object(converted_operation_fields), ); }; diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index b559c16fc2..7f656fa513 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -493,7 +493,7 @@ async fn test_commit_info() -> Result<(), Box> { assert_eq!(last_commit["readVersion"], json!(version)); assert_eq!( last_commit["operationParameters"]["targetSize"], - json!(2_000_000) + json!("2000000") ); // TODO: Requires a string representation for PartitionFilter assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null); From d7bb852716e9f862b82114c4deef0ff2d0ca04e1 Mon Sep 17 00:00:00 2001 From: Ilya Moshkov Date: Sun, 8 Jan 2023 23:21:06 +0400 Subject: [PATCH 2/3] fix after format check --- rust/src/action/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 7e70778178..688c2814b1 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -510,9 +510,11 @@ impl DeltaOperation { if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) { let all_operation_fields = map.values().next().unwrap().as_object().unwrap(); - let converted_operation_fields: Map = all_operation_fields.iter() + let converted_operation_fields: Map = all_operation_fields + .iter() .filter(|item| !item.1.is_null()) - .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))).collect(); + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) + .collect(); commit_info.insert( "operationParameters".to_string(), From af1a0be51d53b4c05e6393bd62c8c54251eb237a Mon Sep 17 00:00:00 2001 From: Ilya Moshkov Date: Sun, 15 Jan 2023 22:41:22 +0400 Subject: [PATCH 3/3] add test for commit info format check * fix commit info building --- rust/src/action/mod.rs | 11 ++++++++- rust/tests/commit_info_format.rs | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 rust/tests/commit_info_format.rs diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 688c2814b1..eab1fdff0b 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -513,7 +513,16 @@ impl DeltaOperation { let converted_operation_fields: Map = all_operation_fields .iter() .filter(|item| !item.1.is_null()) - .map(|(k, v)| (k.clone(), serde_json::Value::String(v.to_string()))) + .map(|(k, v)| { + ( + k.clone(), + serde_json::Value::String(if v.is_string() { + String::from(v.as_str().unwrap()) + } else { + v.to_string() + }), + ) + }) .collect(); commit_info.insert( diff --git a/rust/tests/commit_info_format.rs b/rust/tests/commit_info_format.rs new file mode 100644 index 0000000000..41cd5f6514 --- /dev/null +++ b/rust/tests/commit_info_format.rs @@ -0,0 +1,40 @@ +#[allow(dead_code)] +mod fs_common; + +use deltalake::action::{Action, DeltaOperation, SaveMode}; + +use serde_json::{json, Value}; +use std::error::Error; + +#[tokio::test] +async fn test_operational_parameters() -> Result<(), Box> { + let path = "./tests/data/operational_parameters"; + let mut table = fs_common::create_table(path, None).await; + + let add = fs_common::add(0); + + let operation = DeltaOperation::Write { + mode: SaveMode::Append, + partition_by: Some(vec!["some_partition".to_string()]), + predicate: None, + }; + + let mut tx = table.create_transaction(None); + let actions = vec![Action::add(add.clone())]; + tx.add_actions(actions); + tx.commit(Some(operation), None).await.unwrap(); + + let commit_info = table.history(None).await?; + let last_commit = &commit_info[commit_info.len() - 1]; + + assert_eq!(last_commit["operationParameters"]["mode"], json!("Append")); + + assert_eq!( + last_commit["operationParameters"]["partitionBy"], + json!("[\"some_partition\"]") + ); + + assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null); + + Ok(()) +}