Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): add more commit info to most operations #2009

Merged
merged 12 commits into from
Jan 4, 2024
23 changes: 21 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,17 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros();

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Delete {
Expand All @@ -259,7 +270,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -390,7 +401,7 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

let (table, metrics) = DeltaOps(table).delete().await.unwrap();
let (mut table, metrics) = DeltaOps(table).delete().await.unwrap();

assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 0);
Expand All @@ -399,6 +410,14 @@ mod tests {
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);

// rewrite is not required
assert_eq!(metrics.rewrite_time_ms, 0);

Expand Down
18 changes: 13 additions & 5 deletions crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,29 @@ impl FileSystemCheckPlan {
default_row_commit_version: file.default_row_commit_version,
}));
}
let metrics = FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
};

let mut app_metadata = HashMap::new();

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());
if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

commit(
self.log_store.as_ref(),
&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
None,
Some(app_metadata),
)
.await?;

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
Ok(metrics)
}
}

Expand Down
18 changes: 17 additions & 1 deletion crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,17 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64;

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Merge {
Expand All @@ -1396,7 +1407,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -2046,6 +2057,11 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let parameters = last_commit.operation_parameters.clone().unwrap();
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
Expand Down
24 changes: 22 additions & 2 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,24 @@ async fn execute(
let operation = DeltaOperation::Update {
predicate: Some(fmt_expr_to_sql(&predicate)?),
};

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

version = commit(
log_store.as_ref(),
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;

Expand Down Expand Up @@ -844,7 +856,7 @@ mod tests {

// Validate order operators do not include nulls
let table = prepare_values_table().await;
let (table, metrics) = DeltaOps(table)
let (mut table, metrics) = DeltaOps(table)
.update()
.with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2))))
.with_update("value", lit(10))
Expand All @@ -857,6 +869,14 @@ mod tests {
assert_eq!(metrics.num_updated_rows, 2);
assert_eq!(metrics.num_copied_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);

let expected = [
"+-------+",
"| value |",
Expand Down
Loading