Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do action reconciliation #456

Merged
merged 6 commits into from
Oct 13, 2021
Merged

Do action reconciliation #456

merged 6 commits into from
Oct 13, 2021

Conversation

viirya
Copy link
Contributor

@viirya viirya commented Oct 11, 2021

Description

Based on https://github.com/delta-io/delta/blob/master/PROTOCOL.md#action-reconciliation, we should do action reconciliation between add and remove action collections.

Related Issue(s)

closes #455

Documentation

rust/src/delta.rs Outdated Show resolved Hide resolved
@viirya
Copy link
Contributor Author

viirya commented Oct 11, 2021

cc @houqp

rust/src/delta.rs Outdated Show resolved Hide resolved
rust/src/delta.rs Outdated Show resolved Hide resolved
// Remove `remove` actions for the same path.
if handle_tombstones {
let mut i = 0;
while i < state.tombstones.len() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tombstones don't need to be ordered, perhaps we could just store them as a HashSet in DeltaTableState instead of vector? Then we can avoid O(n) iteration in here as well as in the DeltaTableState::merge method. @viirya @dispanser @Dandandan @mosyp @xianwill what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a hashset would make sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, we re-introduce the time complexity of O(n * m). We could potentially follow the same approach we do with handling remove operations, by handing over the final reconcilation of add / remove to the merge operation (see #444). This would allow for a nice symmetry of code for handling both "directions" of reconcile the same way.

In particular, checkpoints are already (by definition) reconciled, so we only need to perform these potentially expensive operations on json-based (incremental) commit files.

@houqp: Hashset is IMHO sufficient, as the order of files shouldn't matter (this is probably also true for the add actions, though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a hashset sounds good.

Copy link
Member

@houqp houqp Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially follow the same approach we do with handling remove operations, by handing over the final reconcilation of add / remove to the merge operation (see #444). This would allow for a nice symmetry of code for handling both "directions" of reconcile the same way.

@dispanser the tricky part about this is order between add and remove matters, i.e. we only want to invalidate a tombstone if the file was added after the tombstone has been created. I think we won't have access to this info inside the merge method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@houqp : the current usage for the merge method is that it is applied for each individual commit, sequentially in order. I believe I remember the delta specs to guarantee that a file is not added and removed from the same commit (this would be pointless anyway), so it will be in at most one of add / remove sets for a particular commit that is merged into the primary DeltaTableState. I think that makes it save, unless I'm missing a corner case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I tried to make tombstones as HashSet. There are a few problems, e.g., Remove must implement Hash and Eq. Although I added such traits to derive but there are more like:

565 |     pub partition_values: Option<HashMap<String, Option<String>>>,                       
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Hash` is not implemented for `HashMap<std::string::String, std::option::Option<std::string::String>>`

So I leave it as it is now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a new change to make tombstones as HashSet now. Please see if this change is okay for you. Thanks.

@houqp
Copy link
Member

houqp commented Oct 11, 2021

Thanks @viirya for taking up on the fix!

rust/src/delta.rs Outdated Show resolved Hide resolved
@dispanser
Copy link
Contributor

I'm having a closer look now. @viirya : nice find. add-remove-add cycles indeed lead to a violation of the delta protocol.

Looking at the unit test test_checkpoint_with_action_reconciliation and the resulting final checkpoint (0003) however, it contains:

+--------------------+--------+----+--------------------+--------------------+
|            metaData|protocol| txn|                 add|              remove|
+--------------------+--------+----+--------------------+--------------------+
|                null|  {1, 2}|null|                null|                null|
|{7941a1c6-89fa-49...|    null|null|                null|                null|
|                null|    null|null|                null|{a50201d9-0b29-4c...|
|                null|    null|null|{e897fedf-978d-40...|                null|
|                null|    null|null|{a50201d9-0b29-4c...|                null|
+--------------------+--------+----+--------------------+--------------------+

The delta protocol talks about the content of the checkpoint, whereas our test only captures the state after reading. The proposed patch fixes the "view" of the data (i.e., the final DeltaTableState), but the checkpoint itself still violates the protocol.

The reason for this is a bit subtle. There are three code paths that eventually hit process_acion:

  1. DeltaTable::create,
    • fixed set of actions
    • add / remove not included
    • can be ignored.
  2. DeltaTable::restore_checkpoint:
    • here, we start with a clean state, loading a checkpoint.
    • Reconcilation for adds or removes is not necessary, because the checkpoint should be consistent according to the protocol
    • this is safe
  3. DeltaTable::apply_log_from_bufread:
    • called for every version incrementally, from apply_log
    • uses a fresh, clean table state
    • because of that, process_action does not see the full table state, so reconcilation is not possible
    • full table state is only considered in the merge operation which happens for every commit independently

The proposed patch fixes the problem at 2. , but that's not solving the underlying problem. It works in so far that it loads an invalid checkpoint and makes it valid, but the bad checkpoint is already in the wild. I made a quick test with spark, and delta 1.0.0 happily accepts the meta data, so it's not that bad but of course since there's no real, backing parquet files we can't see if its interpretation is correct.

Another way to see that it's not fixing all cases: if we remove the checkpoint writing, and force delta-rs to load from json commits instead, the test starts to fail: it has the same file in adds and removes.

I would like to think about the reconcilation rules as a set of invariants for the state of the meta data, and those should be valid not on checkpoint writing, but always. A checkpoint write then produces correct data in all cases.

In my opinion the only place where all information is available is the merge operation, as it has access to both the full table state and the newly created actions from the latest commit.

@viirya
Copy link
Contributor Author

viirya commented Oct 12, 2021

Yes, when I'm moving the logic to merge, I noticed that the fix put in process_action for restore_checkpoint only fixed the view of checkpoint. We should do it in merge so to keep valid state in every commit and writing checkpoint.

@viirya
Copy link
Contributor Author

viirya commented Oct 12, 2021

Oh, I also updated the test by removing the checkpoint writing to reflect it.

@@ -191,6 +191,38 @@ mod checkpoints_with_tombstones {
assert_eq!(actions, vec![r1_updated, r2]);
}

#[tokio::test]
async fn test_checkpoint_with_action_reconciliation() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although removing checkpoint writing, I don't find other more proper place to put this test. So still leave it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps creating a new reconciliation test file? or we can move it into read_delta_test since it's effectively testing for reconciliation on table update, which was implicitly called by table commit.

we can also simplify this test with two commits, one adds then removes a file, the 2nd one adds the same file back.

Copy link
Contributor Author

@viirya viirya Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to read_delta_test and simplified.

@viirya viirya changed the title Do action reconciliation when reading checkpoint Do action reconciliation Oct 12, 2021
@viirya viirya force-pushed the issue_455 branch 2 times, most recently from 9434e0a to 67e4243 Compare October 13, 2021 00:49
@viirya
Copy link
Contributor Author

viirya commented Oct 13, 2021

Does current approach look good? @houqp

@dispanser
Copy link
Contributor

Does current approach look good? @houqp

To me it indeed looks good. I made a hand-wavvy performance test with one particular big delta table that was used for #444, and I don't see any negative impact on performance. Great job!

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me over all, thanks @viirya ! Left couple minor comments.

@@ -157,6 +158,8 @@ mod checkpoints_with_tombstones {

// r2 has extended_file_metadata=false, then every tombstone should be so, even r1
assert_ne!(actions, vec![r1.clone(), r2.clone()]);
// assert!(!actions.contains(&r1));
// assert!(!actions.contains(&r2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need these two comments?

@@ -191,6 +191,38 @@ mod checkpoints_with_tombstones {
assert_eq!(actions, vec![r1_updated, r2]);
}

#[tokio::test]
async fn test_checkpoint_with_action_reconciliation() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps creating a new reconciliation test file? or we can move it into read_delta_test since it's effectively testing for reconciliation on table update, which was implicitly called by table commit.

we can also simplify this test with two commits, one adds then removes a file, the 2nd one adds the same file back.

@@ -569,6 +570,24 @@ pub struct Remove {
pub tags: Option<HashMap<String, Option<String>>>,
}

impl Hash for Remove {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(self.path.as_bytes())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice workaround :D

@@ -428,8 +428,6 @@ impl DeltaTableState {

/// merges new state information into our state
pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) {
self.files.append(&mut new_state.files);

if !new_state.tombstones.is_empty() {
let new_removals: HashSet<&str> = new_state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we could avoid building new_removals here by storing tombstones in a hashset, but forgot we are only checking for path here :( It looks like in this case we don't get any benefit by changing from Vec to HashSet, that's a bummer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I tried some hacks, but still not work. No better idea so far.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's not easy, i will file a follow up issue for this so we can discuss potential solutions there.


if !new_state.files.is_empty() {
let new_adds: HashSet<&str> = new_state.files.iter().map(|s| s.path.as_str()).collect();
self.tombstones
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we also hide this tombstones filter behind require_tombstones feature flag?

}

if !new_state.files.is_empty() {
let new_adds: HashSet<&str> = new_state.files.iter().map(|s| s.path.as_str()).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I was hoping a HasSet could help avoid the new_adds hashset here. I wish there is a way to check for existence by providing the hashed value.

@houqp
Copy link
Member

houqp commented Oct 13, 2021

Good summary @dispanser . I we now have an implicit rule on DeltaTableState: The state has be to restructured from either a checkpoint or a single commit. In theory, a user could write code to call process_action on a state using actions gathered from multiple commits, which could result in corrupted/unreconsilable state.

As a future improvement, I think there are type tricks in rust we can apply to make sure DeltaTableState is only reconstructed from a single checkpoint or commit and becomes immutable after all actions have been applied. So we can prevent incorrect use of process_action at build time.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @viirya !

@houqp houqp enabled auto-merge (squash) October 13, 2021 17:36
@houqp houqp merged commit ea22346 into delta-io:main Oct 13, 2021
@viirya
Copy link
Contributor Author

viirya commented Oct 13, 2021

Thanks @houqp @dispanser @Dandandan !

@houqp
Copy link
Member

houqp commented Oct 13, 2021

Thank you @viirya for taking on this fix, lots of good discussions came out of it :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add back removed file doesn't update tombstone
4 participants