Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, Spark: Remove dangling deletes as part of RewriteDataFilesAction #9724

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

dramaticlly
Copy link
Contributor

@dramaticlly dramaticlly commented Feb 13, 2024

Goal: Attempt to clean up the dangling deletes as part of Spark RewriteDataFilesAction, it can be controlled by feature flag
remove-dangling-deletes and it's by default turned on. Most of the code come from #6581 and reason on why we need it: The problem and design doc is here: https://docs.google.com/document/d/11d-cIUR_89kRsMmWnEoxXGZCvp7L4TUmPJqUC60zB5M/edit#

Changes

  • DeleteFiles to remove a given deleteFile
  • RewriteDataFilesResult now provide count on number of dangling files removed
  • withReusableDS() function moved from RewriteManifestsSparkAction to base so it can be reused in RewriteDataFilesAction.

TODO, figure out predicate push down for entries metadata table.

  • rewriteDataFiles today support a custom expression filter which are targeted against base table schema
  • entries metadata table used for identifying the dangling deletes will have a different schema than base table, the partition filter can be translated but no sure about filter applied on other non-partitioned columns.

@dramaticlly
Copy link
Contributor Author

@szehon-ho can I ask for your eyes first?

@dramaticlly dramaticlly force-pushed the danglingDeletes branch 5 times, most recently from aca367c to abe98de Compare February 15, 2024 00:13
@dramaticlly dramaticlly marked this pull request as ready for review February 20, 2024 18:11
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dramaticlly thanks for doing this. Can you put me as a co-author as most of the code is from #6581 ?

And as that is the case, it would be nice if @aokolnychyi takes a look as well

* <p>
*
* <ul>
* <li>If remove-dangling-deletes=metadata, then dangling delete files will be pruned from iceberg
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can remove 'remove-dangling-deletes=' , its a bit repetitive?

ie,

metadata: dangling delete files will be pruned from...

*
* <p>
*/
public enum RemoveDanglingDeletesMode {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi the thought here, is we will have further modes: STATS (for partition file stats), FULL (run a whole rewritePositionDeletes job)

Default is NONE because it does create one more snapshot and will might break people who are depending on listing the snapshots for information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need this enum to be honest. The decision to use partition stats instead of scanning should be done by Iceberg, not users. If we detect there is a viable partition stats file, we should always use it, instead of scanning the metadata. Also, the FULL mode seems a bit awkward as it would actually rewrite deletes, rather than drop dangling.

I'd not add it for now and see if we want to reconsider this decision later.

* <p>
*
* <ul>
* <li>If remove-dangling-deletes=metadata, then dangling delete files will be pruned from
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same small comment from above javadoc (remove 'remove-dangling-deletes=' as its too repetitive)

ie,

metadata: dangling delete files will be pruned from...

// Replace the hyphen in order name with underscore to map to the enum value. For example:
// rewrite-position to REWRITE_POSITION
try {
return RemoveDanglingDeletesMode.valueOf(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed anymore right? (The enum values have no hyphen )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right. Initially i though add rewrite-position so that we can trigger rewrite position deletes spark action but removed later. I guess I can remove it now and add it later

return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
}

private List<DeleteFile> removeDanglingDeletes() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can move this to another , package protected class like RemoveDanglingDeleteSparkAction? (for better code reading)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is non-trivial here and is off by default. I'd probably move it into a separate action, the existing action is already complicated. If so, I am not sure we even have to call it from rewrite data files then. If we ask the user to pass a property explicitly, I'd prefer separating the two actions and have a dedicated procedure.

@aokolnychyi
Copy link
Contributor

I should have time to take a look this week.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I did one pass and here are my high-level notes:

  • We should use RewriteFiles instead of DeleteFiles, changes in DeleteFiles should be reverted.
  • I don't see a need for the enum to control the cleanup mode.
  • I'd consider having a separate action but I can be convinced otherwise. Especially, given that we may account for partition stats in the future.
  • I'd consider the following algorithm:
    • Extend data_files and delete_files metadata tables to include data sequence numbers, if needed. I don't remember if we already populate them. This should be trivial as each DeleteFile object already has this info.
    • Query data_files, aggregate, compute min data sequence number per partition. Don't cache the computed result, just keep a reference to it.
    • Query delete_files, potentially projecting only strictly required columns.
    • Join the summary with delete_files on the spec ID and partition. Find delete files that can be discarded in one go by having a predicate that accounts for the delete type (position vs equality).
    • Collect the result to the driver and use SparkDeleteFile to wrap Spark rows as valid delete files. See the action for rewriting manifests for an example.

api/src/main/java/org/apache/iceberg/DeleteFiles.java Outdated Show resolved Hide resolved
*
* <p>
*/
public enum RemoveDanglingDeletesMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need this enum to be honest. The decision to use partition stats instead of scanning should be done by Iceberg, not users. If we detect there is a viable partition stats file, we should always use it, instead of scanning the metadata. Also, the FULL mode seems a bit awkward as it would actually rewrite deletes, rather than drop dangling.

I'd not add it for now and see if we want to reconsider this decision later.

return ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
}

private List<DeleteFile> removeDanglingDeletes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is non-trivial here and is off by default. I'd probably move it into a separate action, the existing action is already complicated. If so, I am not sure we even have to call it from rewrite data files then. If we ask the user to pass a property explicitly, I'd prefer separating the two actions and have a dedicated procedure.

@dramaticlly
Copy link
Contributor Author

dramaticlly commented Feb 27, 2024

Okay, I did one pass and here are my high-level notes:

  • We should use RewriteFiles instead of DeleteFiles, changes in DeleteFiles should be reverted.

  • I don't see a need for the enum to control the cleanup mode.

  • I'd consider having a separate action but I can be convinced otherwise. Especially, given that we may account for partition stats in the future.

  • I'd consider the following algorithm:

    • Extend data_files and delete_files metadata tables to include data sequence numbers, if needed. I don't remember if we already populate them. This should be trivial as each DeleteFile object already has this info.
    • Query data_files, aggregate, compute min data sequence number per partition. Don't cache the computed result, just keep a reference to it.
    • Query delete_files, potentially projecting only strictly required columns.
    • Join the summary with delete_files on the spec ID and partition. Find delete files that can be discarded in one go by having a predicate that accounts for the delete type (position vs equality).
    • Collect the result to the driver and use SparkDeleteFile to wrap Spark rows as valid delete files. See the action for rewriting manifests for an example.

Based on Anton's feedback, I will try divide the changes into 2 PRs where first PR (#9813) to support data sequence number in data and delete files table. Once merged, I will update to scan data_files first to aggregate per spec/partition min data sequence number, then compare against the delete_files. With left join, we can identify dangling deletes and remove them in one pass. SparkDeleteFile will be used to convert from spark row to POJO to be used for pruning, in consideration of partition evolution. Finally, dangling delete will be removed by reconstruction instead of by file path, to benefit manifest pruning when iceberg table was scanned.

@zinking
Copy link
Contributor

zinking commented Feb 27, 2024

"Finally, dangling delete will be removed by reconstruction instead of by file path, to benefit manifest pruning when iceberg table was scanned."

I guess only partitionData and path is needed, others all not used.

Optionally can be enabled as part of RewriteDataFilesSparkAction

Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
@dramaticlly
Copy link
Contributor Author

With the merge of #10203 , I refactored the algorithm a bit to scan entries table for getting minSequenceNumberPerPartitionAndSpec and for getting delete files table for data sequence number instead of rely on data sequence number as virtual columns. I also identified and fixed the problem in partition evolution tests so that now it's all handled correctly. Would you like to take another look? @szehon-ho @aokolnychyi

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me, did a review round on the code portion.

commit(rewriteFiles);
}

return new RemoveDanglingDeleteFilesActionResult(danglingDeletes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have just a count here?

One reason being , if the user keeps a handle of the Result for some reason, the list wont be in the memory (it can be GC'ed after this method)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually used the count in RewriteDataFiles.Results but the actual collection here, as it would be easier to verify the correctness of files get removed in the tests. Would switch from List to Iterable help in term of memory?

- Rewording documentation and add more comments
- Changed removed deletes in results from List to Iterable to save on memory
- Added BaseRemoveDanglingDeleteFiles and generate Immutable implementation
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dramaticlly ! Left some more minor comments, mostly in the doc part as its a bit of a complex algorithm, but also some others.

.or(
col("data_file.content")
.equalTo("2")
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, just read the spec that equality delete in unpartitioned spec is GLOBAL delete. Will we remove that one by mistake, and can we add a test for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we checked for if table is partitioned at very early and return early as ManifestFilterManager will handle such for unpartitioned table. Do you think it would be sufficient or do you want a partition evolution to move partitioned table into unpartitioned one with a dangling equality delete?

.join(minSequenceNumberByPartition, joinOnPartition, "left")
.filter(filterOnDanglingDeletes)
.select("data_file.*");
return danglingDeletes.collectAsList().stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, is this something we can do on executors:

 deleteEntries
            .join(minSequenceNumberByPartition, joinOnPartition, "left")
            .filter(filterOnDanglingDeletes)
            .select("data_file.*")
            .map(row -> deleteFileWrapper(danglingDeletes.schema(), row))

?

Copy link
Contributor Author

@dramaticlly dramaticlly Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I think we actually need to collect to driver and apply conversion, the reason is that the SparkDeleteFile is not serializable and @aokolnychyi told me to wrap after collect to driver. I also added a comment to clarify

- Instantiate spark action without clone session
- Update javadoc to use html order list
- Inline resultBuilder in RewriteDataFilesSparkAction
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants