Skip to content

Commit

Permalink
feat: check invariants in write command (#980)
Browse files Browse the repository at this point in the history
# Description

This PR integrates the `DeltaDataChecker` in the write path of the
operations API. I was unsure whether to integrate this in the
(experimental) writer in the operations module, but opted for keeping
the writer itself focussed on the lower level write operations.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Dec 3, 2022
1 parent 4efc67c commit be1073c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option<bool>
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
invariants: Vec<Invariant>,
ctx: SessionContext,
Expand Down
64 changes: 61 additions & 3 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::{transaction::commit, CreateBuilder};
use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode};
use crate::builder::DeltaTableBuilder;
use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
use crate::delta_datafusion::DeltaDataChecker;
use crate::schema::Schema;
use crate::storage::DeltaObjectStore;
use crate::writer::record_batch::divide_by_partition_values;
Expand Down Expand Up @@ -319,6 +320,12 @@ impl std::future::IntoFuture for WriteBuilder {
Err(WriteError::MissingData)
}?;

let invariants = table
.get_metadata()
.and_then(|meta| meta.schema.get_invariants())
.unwrap_or_default();
let checker = DeltaDataChecker::new(invariants);

// Write data to disk
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
Expand All @@ -333,12 +340,14 @@ impl std::future::IntoFuture for WriteBuilder {
this.write_batch_size,
);
let mut writer = DeltaWriter::new(object_store.clone(), config);

let checker_stream = checker.clone();
let mut stream = inner_plan.execute(i, task_ctx)?;
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> =
tokio::task::spawn(async move {
while let Some(batch) = stream.next().await {
writer.write(&batch?).await?;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
checker_stream.check_batch(&batch).await?;
writer.write(&batch).await?;
}
writer.close().await
});
Expand Down Expand Up @@ -429,6 +438,7 @@ mod tests {
use super::*;
use crate::operations::DeltaOps;
use crate::writer::test_utils::{get_delta_schema, get_record_batch};
use serde_json::json;

#[tokio::test]
async fn test_create_write() {
Expand Down Expand Up @@ -503,4 +513,52 @@ mod tests {
assert_eq!(table.version(), 0);
assert_eq!(table.get_file_uris().count(), 4)
}

#[tokio::test]
async fn test_check_invariants() {
let batch = get_record_batch(None, false);
let schema: Schema = serde_json::from_value(json!({
"type": "struct",
"fields": [
{"name": "id", "type": "string", "nullable": true, "metadata": {}},
{"name": "value", "type": "integer", "nullable": true, "metadata": {
"delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }"
}},
{"name": "modified", "type": "string", "nullable": true, "metadata": {}},
]
}))
.unwrap();
let table = DeltaOps::new_in_memory()
.create()
.with_save_mode(SaveMode::ErrorIfExists)
.with_columns(schema.get_fields().clone())
.await
.unwrap();
assert_eq!(table.version(), 0);

let table = DeltaOps(table).write(vec![batch.clone()]).await.unwrap();
assert_eq!(table.version(), 1);

let schema: Schema = serde_json::from_value(json!({
"type": "struct",
"fields": [
{"name": "id", "type": "string", "nullable": true, "metadata": {}},
{"name": "value", "type": "integer", "nullable": true, "metadata": {
"delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }"
}},
{"name": "modified", "type": "string", "nullable": true, "metadata": {}},
]
}))
.unwrap();
let table = DeltaOps::new_in_memory()
.create()
.with_save_mode(SaveMode::ErrorIfExists)
.with_columns(schema.get_fields().clone())
.await
.unwrap();
assert_eq!(table.version(), 0);

let table = DeltaOps(table).write(vec![batch.clone()]).await;
assert!(table.is_err())
}
}

0 comments on commit be1073c

Please sign in to comment.