Skip to content

Commit

Permalink
implement conflict check & table read
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Apr 1, 2024
1 parent 723105b commit 7769924
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 42 deletions.
34 changes: 12 additions & 22 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::kernel::StructType;
use crate::logstore::LogStore;
use crate::operations::transaction::CommitData;
use crate::table::config::TableConfig;
use crate::table::state::AppTransactionVisitor;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

mod log_data;
Expand Down Expand Up @@ -333,20 +332,12 @@ impl Snapshot {
}
}

/// TODO!
/// Allows hooking into the reading of commit files and checkpoints whenever a table is loaded or updated.
pub trait ReplayVistor: Send {
/// TODO!
/// Process a batch
fn visit_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()>;
}

struct PrintVistor {}
impl ReplayVistor for PrintVistor {
fn visit_batch(&mut self, _batch: &RecordBatch) -> DeltaResult<()> {
println!("Hello world!");
Ok(())
}
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
#[derive(Debug, Clone, PartialEq)]
pub struct EagerSnapshot {
Expand All @@ -364,9 +355,7 @@ impl EagerSnapshot {
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
let mut p = PrintVistor {};
let mut i = AppTransactionVisitor::new();
Self::try_new_with_visitor(table_root, store, config, version, vec![&mut p, &mut i]).await
Self::try_new_with_visitor(table_root, store, config, version, vec![]).await
}

/// Create a new [`EagerSnapshot`] instance
Expand Down Expand Up @@ -397,18 +386,16 @@ impl EagerSnapshot {
}

/// Update the snapshot to the given version
pub async fn update(
pub async fn update<'a>(
&mut self,
log_store: Arc<dyn LogStore>,
target_version: Option<i64>,
visitors: Vec<&'a mut dyn ReplayVistor>,
) -> DeltaResult<()> {
dbg!("Update Call: {} {:?}", self.version(), target_version);
if Some(self.version()) == target_version {
return Ok(());
}

let mut p = PrintVistor {};
let visitors: Vec<&mut dyn ReplayVistor> = vec![&mut p];
let new_slice = self
.snapshot
.update_inner(log_store.clone(), target_version)
Expand All @@ -431,7 +418,6 @@ impl EagerSnapshot {
)
.boxed()
};
dbg!("here2");
let mapper = LogMapper::try_new(&self.snapshot)?;
let files =
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, visitors)?
Expand Down Expand Up @@ -511,8 +497,8 @@ impl EagerSnapshot {
pub fn advance<'a>(
&mut self,
commits: impl IntoIterator<Item = &'a CommitData>,
mut visitors: Vec<&'a mut dyn ReplayVistor>,
) -> DeltaResult<i64> {
println!("Advance!");
let mut metadata = None;
let mut protocol = None;
let mut send = Vec::new();
Expand Down Expand Up @@ -542,7 +528,11 @@ impl EagerSnapshot {
let mut scanner = LogReplayScanner::new();

for batch in actions {
files.push(scanner.process_files_batch(&batch?, true)?);
let batch = batch?;
files.push(scanner.process_files_batch(&batch, true)?);
for visitor in &mut visitors {
visitor.visit_batch(&batch)?;
}
}

let mapper = LogMapper::try_new(&self.snapshot)?;
Expand Down Expand Up @@ -817,7 +807,7 @@ mod tests {
let actions =
vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()];

let new_version = snapshot.advance(&actions)?;
let new_version = snapshot.advance(&actions, vec![])?;
assert_eq!(new_version, version + 1);

let new_files = snapshot.file_actions()?.map(|f| f.path).collect::<Vec<_>>();
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ where
if matches!(res, Poll::Ready(None)) {
this.checkpoint.poll_next(cx).map(|b| match b {
Some(Ok(batch)) => {
dbg!("Checkpoint Batch");
for visitor in this.visitors.iter_mut() {
visitor.visit_batch(&batch).unwrap();
}
Expand Down
30 changes: 27 additions & 3 deletions crates/core/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,24 @@ impl<'a> TransactionInfo<'a> {
) -> DeltaResult<Self> {
use datafusion::prelude::SessionContext;

use crate::kernel::Txn;

let session = SessionContext::new();
let read_predicates = read_predicates
.map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
.transpose()?;

let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Txn { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}

Ok(Self {
txn_id: "".into(),
read_predicates,
read_app_ids: Default::default(),
read_app_ids: read_app_ids,
actions,
read_snapshot,
read_whole_table,
Expand All @@ -139,10 +149,18 @@ impl<'a> TransactionInfo<'a> {
actions: &'a Vec<Action>,
read_whole_table: bool,
) -> Self {
use crate::kernel::Txn;

let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Txn { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}
Self {
txn_id: "".into(),
read_predicates,
read_app_ids: Default::default(),
read_app_ids: read_app_ids,
actions,
read_snapshot,
read_whole_table,
Expand All @@ -156,10 +174,16 @@ impl<'a> TransactionInfo<'a> {
actions: &'a Vec<Action>,
read_whole_table: bool,
) -> DeltaResult<Self> {
let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Txn { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}
Ok(Self {
txn_id: "".into(),
read_predicates,
read_app_ids: Default::default(),
read_app_ids: read_app_ids,
actions,
read_snapshot,
read_whole_table,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl TableReference for DeltaTableState {
}

/// Data that was actually written to the log store.
#[derive(Debug)]
pub struct CommitData {
/// The actions
pub actions: Vec<Action>,
Expand Down Expand Up @@ -273,8 +274,6 @@ impl CommitData {
actions.push(Action::Txn(txn.clone()))
}

dbg!("{:?}", &actions);

Ok(CommitData {
actions,
operation,
Expand Down Expand Up @@ -568,6 +567,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}

/// A commit that successfully completed
#[derive(Debug)]
pub struct FinalizedCommit {
/// The winning version number of the commit
pub version: i64,
Expand Down
123 changes: 117 additions & 6 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,11 +812,10 @@ impl std::future::IntoFuture for WriteBuilder {
// created actions, it may be safe to assume, that we want to include all actions.
// then again, having only some tombstones may be misleading.
if let Some(mut snapshot) = this.snapshot {
dbg!("merge1!");
snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?;

Ok(DeltaTable::new_with_state(this.log_store, snapshot))
} else {
dbg!("update!");
let mut table = DeltaTable::new(this.log_store, Default::default());
table.update().await?;
Ok(table)
Expand Down Expand Up @@ -902,7 +901,7 @@ mod tests {
get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch,
get_record_batch_with_nested_struct, setup_table_with_configuration,
};
use crate::DeltaConfigKey;
use crate::{checkpoints, DeltaConfigKey, DeltaTableBuilder};
use arrow::datatypes::Field;
use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray};
Expand Down Expand Up @@ -1640,9 +1639,21 @@ mod tests {
}

#[tokio::test]
async fn test_app_txn() {
async fn test_app_txn_workload() {
// Test that the transaction ids can be read from different scenarios
// 1. Write new table to storage
// 2. Read new table
// 3. Write to table a new txn id and then update a different table state that uses the same underlying table
// 4. Write a checkpoint and read that checkpoint.

let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
dbg!("{:?}", &tmp_path);

let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap())
.await
.unwrap()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.with_partition_columns(["modified"])
Expand All @@ -1655,7 +1666,107 @@ mod tests {
assert_eq!(table.get_files_count(), 2);

let app_txns = table.get_app_transaction_version();
println!("{:?}", &app_txns);
dbg!("{:?}", &app_txns);
assert_eq!(app_txns.len(), 1);
assert_eq!(app_txns.get("my-app"), Some(&1));

// Test Txn Id can be read from existing table

let mut table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap())
.load()
.await
.unwrap();
let app_txns2 = table2.get_app_transaction_version();

assert_eq!(app_txns2.len(), 1);
assert_eq!(app_txns2.get("my-app"), Some(&1));
dbg!("{:?}", &app_txns2);

// Write new data to the table and check that `update` functions work

let table = DeltaOps::from(table)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)),
)
.await
.unwrap();

assert_eq!(table.version(), 1);
let app_txns = table.get_app_transaction_version();
dbg!("{:?}", &app_txns);
assert_eq!(app_txns.len(), 1);
assert_eq!(app_txns.get("my-app"), Some(&3));

dbg!("do update");
table2.update_incremental(None).await.unwrap();
assert_eq!(table2.version(), 1);
let app_txns2 = table2.get_app_transaction_version();
assert_eq!(app_txns2.len(), 1);
assert_eq!(app_txns2.get("my-app"), Some(&3));
dbg!("{:?}", &app_txns2);
dbg!("done update");

// Create a checkpoint and then load
dbg!("Load checkpoint");
checkpoints::create_checkpoint(&table).await.unwrap();
let table3 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap())
.load()
.await
.unwrap();
let app_txns3 = table2.get_app_transaction_version();
assert_eq!(app_txns3.len(), 1);
assert_eq!(app_txns3.get("my-app"), Some(&3));
assert_eq!(table3.version(), 1);
}

#[tokio::test]
async fn test_app_txn_conflict() {
// A conflict must be raised whenever the same application id is used for two concurrent transactions

let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();

let batch = get_record_batch(None, false);
let table = DeltaOps::try_from_uri(tmp_path.to_str().unwrap())
.await
.unwrap()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.with_partition_columns(["modified"])
.with_commit_properties(
CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)),
)
.await
.unwrap();
assert_eq!(table.version(), 0);

let table2 = DeltaTableBuilder::from_uri(tmp_path.to_str().unwrap())
.load()
.await
.unwrap();
assert_eq!(table2.version(), 0);

let table = DeltaOps::from(table)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 2)),
)
.await
.unwrap();
assert_eq!(table.version(), 1);

let res = DeltaOps::from(table2)
.write(vec![get_record_batch(None, false)])
.with_commit_properties(
CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 3)),
)
.await;

let err = res.err().unwrap();
assert_eq!(
err.to_string(),
"Transaction failed: Failed to commit transaction: Concurrent transaction failed."
);
}
}
7 changes: 0 additions & 7 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use object_store::{path::Path, ObjectStore};
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;

use self::builder::DeltaTableConfig;
use self::state::DeltaTableState;
Expand Down Expand Up @@ -341,15 +340,9 @@ impl DeltaTable {
&mut self,
max_version: Option<i64>,
) -> Result<(), DeltaTableError> {
dbg!(
"incremental update with version({}) and max_version({max_version:?})",
self.version(),
max_version,
);
match self.state.as_mut() {
Some(state) => state.update(self.log_store.clone(), max_version).await,
_ => {
dbg!("New!");
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.object_store(),
Expand Down
Loading

0 comments on commit 7769924

Please sign in to comment.