Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do action reconciliation #456

Merged
merged 6 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,6 @@ impl DeltaTableState {

/// merges new state information into our state
pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) {
self.files.append(&mut new_state.files);

if !new_state.tombstones.is_empty() {
let new_removals: HashSet<&str> = new_state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we could avoid building new_removals here by storing tombstones in a hashset, but forgot we are only checking for path here :( It looks like in this case we don't get any benefit by changing from Vec to HashSet, that's a bummer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I tried some hacks, but still not work. No better idea so far.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's not easy, i will file a follow up issue for this so we can discuss potential solutions there.

.tombstones
Expand All @@ -445,6 +443,14 @@ impl DeltaTableState {
self.tombstones.append(&mut new_state.tombstones);
}

if !new_state.files.is_empty() {
let new_adds: HashSet<&str> = new_state.files.iter().map(|s| s.path.as_str()).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I was hoping a HasSet could help avoid the new_adds hashset here. I wish there is a way to check for existence by providing the hashed value.

self.tombstones
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we also hide this tombstones filter behind require_tombstones feature flag?

.retain(|a| !new_adds.contains(a.path.as_str()));
}

self.files.append(&mut new_state.files);

if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
self.min_writer_version = new_state.min_writer_version;
Expand Down Expand Up @@ -766,6 +772,7 @@ impl DeltaTable {
let checkpoint_data_paths = self.get_checkpoint_data_paths(&check_point);
// process actions from checkpoint
self.state = DeltaTableState::default();

for f in &checkpoint_data_paths {
let obj = self.storage.get_obj(f).await?;
let preader = SerializedFileReader::new(SliceableCursor::new(obj))?;
Expand Down
32 changes: 32 additions & 0 deletions rust/tests/checkpoint_writer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,38 @@ mod checkpoints_with_tombstones {
assert_eq!(actions, vec![r1_updated, r2]);
}

#[tokio::test]
async fn test_checkpoint_with_action_reconciliation() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although removing checkpoint writing, I don't find other more proper place to put this test. So still leave it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps creating a new reconciliation test file? or we can move it into read_delta_test since it's effectively testing for reconciliation on table update, which was implicitly called by table commit.

we can also simplify this test with two commits, one adds then removes a file, the 2nd one adds the same file back.

Copy link
Contributor Author

@viirya viirya Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to read_delta_test and simplified.

let path = "./tests/data/checkpoints_tombstones/action_reconciliation";
let mut table = create_table(path, None).await;

let a1 = add(3 * 60 * 1000);
assert_eq!(1, commit_add(&mut table, &a1).await);
assert_eq!(table.get_files(), vec![a1.path.as_str()]);

// Remove added file.
let (_, opt1) = pseudo_optimize(&mut table, 5 * 59 * 1000).await;
assert_eq!(table.get_files(), vec![opt1.path.as_str()]);
assert_eq!(
table
.get_state()
.all_tombstones()
.iter()
.map(|r| r.path.as_str())
.collect::<Vec<_>>(),
vec![a1.path.as_str()]
);

// Add removed file back.
assert_eq!(3, commit_add(&mut table, &a1).await);
assert_eq!(
table.get_files(),
vec![opt1.path.as_str(), a1.path.as_str()]
);
// tombstone is removed.
assert_eq!(table.get_state().all_tombstones().len(), 0);
}

async fn create_table(
path: &str,
config: Option<HashMap<String, Option<String>>>,
Expand Down