-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimistic transaction protocol #632
Changes from all commits
aedecde
224b826
3243ee5
bf55226
595e91b
9c4bb02
139c1a4
fcc7c90
bb4cfeb
1460d91
eabb5bd
3d91509
d3f10e6
9fea909
40611aa
03ec7bc
dbb8be9
9622ef5
3bdcd13
19c7064
2cfc391
e1e0b0b
dc96f19
8b65628
59df7b0
466999e
c8034b0
a048d67
1f923ac
58f23ea
a831d1a
fbb0083
5ac369a
bb5c4f0
a792dbe
c1bd4ea
adbfc09
a638b50
99ed5a6
0562e17
2a2c34f
0c1e5af
b8d40a7
1f0f27f
58b25aa
10486c5
24cc38a
1825a23
bd276b6
7821ea2
a149c81
cad5865
3c93e59
ad8141c
a844e79
b5c2eb0
f69bfc7
93dcfaf
cee283f
711c59d
c209fc9
b21c237
af22900
3bf44f7
7cfdde3
b4dc95e
906fa7c
d5eaa8c
2378123
f7ab749
4c61b1e
88072e9
576f5db
1ad09ad
fd9da63
0dda8cb
207d65f
97d3405
54b1a85
3d801e0
3eb23b3
d390884
68f1b6e
740eeff
bfcb393
1b1e4ad
4cbd158
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -544,6 +544,13 @@ pub enum DeltaOperation { | |
/// The predicate used during the write. | ||
predicate: Option<String>, | ||
}, | ||
|
||
/// Delete data matching predicate from delta table | ||
Delete { | ||
/// The condition the to be deleted data must match | ||
predicate: Option<String>, | ||
}, | ||
|
||
/// Represents a Delta `StreamingUpdate` operation. | ||
#[serde(rename_all = "camelCase")] | ||
StreamingUpdate { | ||
|
@@ -580,6 +587,7 @@ impl DeltaOperation { | |
} | ||
DeltaOperation::Create { .. } => "CREATE TABLE", | ||
DeltaOperation::Write { .. } => "WRITE", | ||
DeltaOperation::Delete { .. } => "DELETE", | ||
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", | ||
DeltaOperation::Optimize { .. } => "OPTIMIZE", | ||
DeltaOperation::FileSystemCheck { .. } => "FSCK", | ||
|
@@ -622,7 +630,8 @@ impl DeltaOperation { | |
Self::Create { .. } | ||
| Self::FileSystemCheck {} | ||
| Self::StreamingUpdate { .. } | ||
| Self::Write { .. } => true, | ||
| Self::Write { .. } | ||
| Self::Delete { .. } => true, | ||
} | ||
} | ||
|
||
|
@@ -641,9 +650,20 @@ impl DeltaOperation { | |
match self { | ||
// TODO add more operations | ||
Self::Write { predicate, .. } => predicate.clone(), | ||
Self::Delete { predicate, .. } => predicate.clone(), | ||
_ => None, | ||
} | ||
} | ||
|
||
/// Denotes if the operation reads the entire table | ||
pub fn read_whole_table(&self) -> bool { | ||
match self { | ||
// TODO just adding one operation example, as currently none of the | ||
// implemented operations scan the entire table. | ||
Self::Write { predicate, .. } if predicate.is_none() => false, | ||
_ => false, | ||
} | ||
} | ||
Comment on lines
+659
to
+666
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reference implementaition allows for txn to "taint" the entire table, in which case we disregrad analysing the specific actions. The conflict checker covers this behavior in tests, but I haven't investigated yet, when this is actually set. Mainly leaving this fn, as a "reminder" for subsequent PRs soon to come. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does overwrite read the entire table? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or could you provide examples of which operations will? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our operations should actually never use this directly right now. I went through the spark code base, and it seems this is invoked mostly in cases when a plan querying multiple tables with spark has a delta table set as a sink.. This usually uses generic plans and does not invoke the delta operations directly, or rather when we are not too sure what actually happened. Did not get too deep though. Personally I am a bit conflicted on how to proceed with this. on one hand it does nothing useful right now. On the other hand, we can keep it as a reminder to mindful of this. While right now I think datafusion does not yet have the concept of a sink in their plans, I believe they might be added fairly soon. At this point we may encounter situations, where a plan wants to write to delta, but we have no way of knowing what was scanned. That said, I do hope that we will be able to inspect the plan metrics and figure out what was read anyhow. Especially since we report the scanned files in out scan operator. |
||
} | ||
|
||
/// The SaveMode used when performing a DeltaOperation | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ use crate::action::{Action, Add, DeltaOperation, Remove}; | |
use crate::operations::transaction::commit; | ||
use crate::storage::DeltaObjectStore; | ||
use crate::table_state::DeltaTableState; | ||
use crate::DeltaDataTypeVersion; | ||
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; | ||
use futures::future::BoxFuture; | ||
use futures::StreamExt; | ||
|
@@ -50,8 +49,6 @@ pub struct FileSystemCheckMetrics { | |
} | ||
|
||
struct FileSystemCheckPlan { | ||
/// Version of the snapshot provided | ||
version: DeltaDataTypeVersion, | ||
/// Delta object store for handling data files | ||
store: Arc<DeltaObjectStore>, | ||
/// Files that no longer exists in undlying ObjectStore but have active add actions | ||
|
@@ -88,7 +85,6 @@ impl FileSystemCheckBuilder { | |
async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> { | ||
let mut files_relative: HashMap<&str, &Add> = | ||
HashMap::with_capacity(self.state.files().len()); | ||
let version = self.state.version(); | ||
let store = self.store.clone(); | ||
|
||
for active in self.state.files() { | ||
|
@@ -118,14 +114,13 @@ impl FileSystemCheckBuilder { | |
|
||
Ok(FileSystemCheckPlan { | ||
files_to_remove, | ||
version, | ||
store, | ||
}) | ||
} | ||
} | ||
|
||
impl FileSystemCheckPlan { | ||
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> { | ||
pub async fn execute(self, snapshot: &DeltaTableState) -> DeltaResult<FileSystemCheckMetrics> { | ||
if self.files_to_remove.is_empty() { | ||
return Ok(FileSystemCheckMetrics { | ||
dry_run: false, | ||
|
@@ -135,8 +130,6 @@ impl FileSystemCheckPlan { | |
|
||
let mut actions = Vec::with_capacity(self.files_to_remove.len()); | ||
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); | ||
let version = self.version; | ||
let store = &self.store; | ||
|
||
for file in self.files_to_remove { | ||
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); | ||
|
@@ -154,10 +147,11 @@ impl FileSystemCheckPlan { | |
} | ||
|
||
commit( | ||
store, | ||
version + 1, | ||
self.store.as_ref(), | ||
&actions, | ||
DeltaOperation::FileSystemCheck {}, | ||
snapshot, | ||
// TODO pass through metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I plan to do a pass through all our operations in the next Pr, where also I'll be focussing on testing the |
||
None, | ||
) | ||
.await?; | ||
|
@@ -188,7 +182,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { | |
)); | ||
} | ||
|
||
let metrics = plan.execute().await?; | ||
let metrics = plan.execute(&this.state).await?; | ||
let mut table = DeltaTable::new_with_state(this.store, this.state); | ||
table.update().await?; | ||
Ok((table, metrics)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and
changes_data
makes me wonder whetherDeltaOperation
should instead be a trait so that we require each instance to implement.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! If OK with you, I would defer exploring that to a follow up PR. To really get a feel for how that API should look like, I would like to work a bit more with operations that actually need the conflict resolution :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me!