Skip to content

Commit

Permalink
feat: optimistic transaction protocol (#632)
Browse files Browse the repository at this point in the history
# Description

This PR adds a `ConflictChecker` struct for conflict resolution in cases
of concurrent commit failures. The implementation is heavily inspired by
the [reference
implementation](https://github.com/delta-io/delta/blob/fe36a53f3c70c5f9c9b5052c12cd1703f495da97/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala).
So far we have most tests from spark that specifically target conflict
resolution covered.

Working on this I thought a bit about what we may consider going
forward, as we move through the protocol versions :). In the end we
could end up with three main structs that are involved in validating a
commit.

* The existing `DataChecker`, which validates and potentially mutates
data when writing data files to disk. (Currently supports invariants)
* The upcoming `ConflictChecker`, which checks if a commit can be
re-tried in case of commit conflicts.
* A new `CommitChecker`, which does a-priory validation of the commit
itself (e.g. append only and other rules covered by tests in
[spark](https://github.com/delta-io/delta/blob/master/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala))

My hope is to get this PR merged right after we release `0.8.0`, so
there is some time to fill some holes and fully leverage the new feature
for `0.9.0`.

If folks agree, I would open some issues and start work on some
follow-ups..

## Follow-ups
* Extend `ConflictChecker` support conflict resolution for streaming
transactions
* Implement `CommitChecker`
* Deprecate old commit function.
* Extend `DataChecker`. 
* Consolidate record batch writer implementations.

# Related Issue(s)

part of #593 

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
roeap and wjones127 authored Apr 7, 2023
1 parent d9920aa commit 490122c
Show file tree
Hide file tree
Showing 15 changed files with 1,533 additions and 101 deletions.
8 changes: 6 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "33.0.0", optional = true }
arrow = { version = "33", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -26,7 +26,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.5.3"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "33", features = [
Expand Down Expand Up @@ -57,6 +57,8 @@ datafusion = { version = "19", optional = true }
datafusion-expr = { version = "19", optional = true }
datafusion-common = { version = "19", optional = true }
datafusion-proto = { version = "19", optional = true }
datafusion-sql = { version = "19", optional = true }
sqlparser = { version = "0.30", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down Expand Up @@ -91,6 +93,8 @@ datafusion = [
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"datafusion-sql",
"sqlparser",
"arrow",
"parquet",
]
Expand Down
22 changes: 21 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ pub enum DeltaOperation {
/// The predicate used during the write.
predicate: Option<String>,
},

/// Delete data matching predicate from delta table
Delete {
/// The condition the to be deleted data must match
predicate: Option<String>,
},

/// Represents a Delta `StreamingUpdate` operation.
#[serde(rename_all = "camelCase")]
StreamingUpdate {
Expand Down Expand Up @@ -580,6 +587,7 @@ impl DeltaOperation {
}
DeltaOperation::Create { .. } => "CREATE TABLE",
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
Expand Down Expand Up @@ -622,7 +630,8 @@ impl DeltaOperation {
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
| Self::Write { .. } => true,
| Self::Write { .. }
| Self::Delete { .. } => true,
}
}

Expand All @@ -641,9 +650,20 @@ impl DeltaOperation {
match self {
// TODO add more operations
Self::Write { predicate, .. } => predicate.clone(),
Self::Delete { predicate, .. } => predicate.clone(),
_ => None,
}
}

/// Denotes if the operation reads the entire table
pub fn read_whole_table(&self) -> bool {
match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
_ => false,
}
}
}

/// The SaveMode used when performing a DeltaOperation
Expand Down
7 changes: 7 additions & 0 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::schema::*;
use super::table_state::DeltaTableState;
use crate::action::{Add, Stats};
use crate::delta_config::DeltaConfigError;
use crate::operations::transaction::TransactionError;
use crate::operations::vacuum::VacuumBuilder;
use crate::storage::{commit_uri_from_version, ObjectStoreRef};

Expand Down Expand Up @@ -216,6 +217,12 @@ pub enum DeltaTableError {
#[from]
source: std::io::Error,
},
/// Error raised while commititng transaction
#[error("Transaction failed: {source}")]
Transaction {
/// The source error
source: TransactionError,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(DeltaDataTypeVersion),
Expand Down
4 changes: 2 additions & 2 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl TableProvider for DeltaTable {
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
let files_to_prune = pruning_predicate.prune(&self.state)?;
self.get_state()
.files()
.iter()
Expand Down Expand Up @@ -630,7 +630,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option<ScalarValue> {
}
}

fn to_correct_scalar_value(
pub(crate) fn to_correct_scalar_value(
stat_val: &serde_json::Value,
field_dt: &ArrowDataType,
) -> Option<ScalarValue> {
Expand Down
5 changes: 2 additions & 3 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl std::future::IntoFuture for CreateBuilder {

Box::pin(async move {
let mode = this.mode.clone();
let metadata = this.metadata.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
if table.object_store().is_delta_table_location().await? {
match mode {
Expand All @@ -302,10 +301,10 @@ impl std::future::IntoFuture for CreateBuilder {
}
let version = commit(
table.object_store().as_ref(),
0,
&actions,
operation,
metadata,
&table.state,
None,
)
.await?;
table.load_version(version).await?;
Expand Down
16 changes: 5 additions & 11 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use futures::future::BoxFuture;
use futures::StreamExt;
Expand Down Expand Up @@ -50,8 +49,6 @@ pub struct FileSystemCheckMetrics {
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
Expand Down Expand Up @@ -88,7 +85,6 @@ impl FileSystemCheckBuilder {
async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<&str, &Add> =
HashMap::with_capacity(self.state.files().len());
let version = self.state.version();
let store = self.store.clone();

for active in self.state.files() {
Expand Down Expand Up @@ -118,14 +114,13 @@ impl FileSystemCheckBuilder {

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
pub async fn execute(self, snapshot: &DeltaTableState) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
Expand All @@ -135,8 +130,6 @@ impl FileSystemCheckPlan {

let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
Expand All @@ -154,10 +147,11 @@ impl FileSystemCheckPlan {
}

commit(
store,
version + 1,
self.store.as_ref(),
&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
None,
)
.await?;
Expand Down Expand Up @@ -188,7 +182,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder {
));
}

let metrics = plan.execute().await?;
let metrics = plan.execute(&this.state).await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
Expand Down
10 changes: 7 additions & 3 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.target_size.to_owned(),
writer_properties,
)?;
let metrics = plan.execute(this.store.clone()).await?;
let metrics = plan.execute(this.store.clone(), &this.snapshot).await?;
let mut table = DeltaTable::new_with_state(this.store, this.snapshot);
table.update().await?;
Ok((table, metrics))
Expand Down Expand Up @@ -270,7 +270,11 @@ pub struct MergePlan {

impl MergePlan {
/// Peform the operations outlined in the plan.
pub async fn execute(self, object_store: ObjectStoreRef) -> Result<Metrics, DeltaTableError> {
pub async fn execute(
self,
object_store: ObjectStoreRef,
snapshot: &DeltaTableState,
) -> Result<Metrics, DeltaTableError> {
let mut actions = vec![];
let mut metrics = self.metrics;

Expand Down Expand Up @@ -368,9 +372,9 @@ impl MergePlan {

commit(
object_store.as_ref(),
self.read_table_version + 1,
&actions,
self.input_parameters.into(),
snapshot,
Some(metadata),
)
.await?;
Expand Down
Loading

0 comments on commit 490122c

Please sign in to comment.