Skip to content

Commit

Permalink
Reword to clarify about pruning delete files in iceberg metadata inst…
Browse files Browse the repository at this point in the history
…ead of delete from file system
  • Loading branch information
Steve Zhang committed Feb 14, 2024
1 parent 9b27236 commit aca367c
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,15 @@ public interface RewriteDataFiles
boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;

/**
* Removes dangling delete files from current snapshot after compaction. A delete file is
* Deletes dangling delete files from current snapshot after compaction. A delete file is
* considered dangling if it does not apply to any non-expired data file.
*
* <p>Dangling delete will be removed in a new snapshot after compaction with delete operation
*
* <p>Pruning apply to both positional delete and equality delete based on data sequence number
* <p>Dangling delete files will be pruned from iceberg metadata. Pruning apply to both position
* delete and equality delete based on data sequence number
*
* <p>Defaults to true.
*/
String REMOVE_DANGLING_DELETE = "remove-dangling-deletes";
String DELETE_DANGLING_DELETE = "delete-dangling-deletes";

boolean REMOVE_DANGLING_DELETE_DEFAULT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -904,7 +906,12 @@ private Map<String, String> snapshotSummary() {
}

private Map<String, String> snapshotSummary(TableIdentifier tableIdentifier) {
return validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary();
Table table = validationCatalog.loadTable(tableIdentifier);
if (DataOperations.DELETE.equals(table.currentSnapshot().operation())) {
return table.snapshot(table.currentSnapshot().parentId()).summary();
} else {
return table.currentSnapshot().summary();
}
}

private List<Object[]> currentData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class RewriteDataFilesSparkAction
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER,
REMOVE_DANGLING_DELETE,
DELETE_DANGLING_DELETE,
REWRITE_JOB_ORDER);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
Expand Down Expand Up @@ -566,7 +566,7 @@ void validateAndInitOptions() {

removeDanglingDeletesEnabled =
PropertyUtil.propertyAsBoolean(
options(), REMOVE_DANGLING_DELETE, REMOVE_DANGLING_DELETE_DEFAULT);
options(), DELETE_DANGLING_DELETE, REMOVE_DANGLING_DELETE_DEFAULT);

rewriteJobOrder =
RewriteJobOrder.fromName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void testRewriteWithDanglingEqualityDeletesDropped() {
.rewriteDataFiles(table)
.filter(partitionFilter)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.REMOVE_DANGLING_DELETE, "false")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.execute();
assertThat(firstRewrite.removedDeleteFilesCount()).isZero();
assertThat(firstRewrite.rewrittenDataFilesCount()).isEqualTo(2);
Expand All @@ -394,7 +394,7 @@ public void testRewriteWithDanglingEqualityDeletesDropped() {
.rewriteDataFiles(table)
.filter(partitionFilter)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.REMOVE_DANGLING_DELETE, "true")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "true")
.execute();
assertThat(secondRewrite.removedDeleteFilesCount()).isOne();
assertThat(secondRewrite.rewrittenDataFilesCount()).isOne();
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testRewriteWithDanglingPositionalDeletesDropped() {
.rewriteDataFiles(table)
.filter(partitionFilter)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.REMOVE_DANGLING_DELETE, "false")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.execute();

assertThat(firstRewrite.removedDeleteFilesCount()).isZero();
Expand All @@ -455,7 +455,7 @@ public void testRewriteWithDanglingPositionalDeletesDropped() {
.rewriteDataFiles(table)
.filter(partitionFilter)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.REMOVE_DANGLING_DELETE, "true")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "true")
.execute();

assertThat(secondRewrite.removedDeleteFilesCount()).isOne();
Expand Down Expand Up @@ -1234,7 +1234,7 @@ public void testSortCustomSortOrderRequiresRepartition() {
basicRewrite(table)
.sort(SortOrder.builderFor(table.schema()).asc("c3").build())
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.REMOVE_DANGLING_DELETE, "false")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.option(
RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(averageFileSize(table) / partitions))
Expand Down Expand Up @@ -1885,7 +1885,10 @@ private List<DeleteFile> writePosDeletes(
table
.io()
.newOutputFile(
table.locationProvider().newDataLocation(UUID.randomUUID() + ".parquet"));
table
.locationProvider()
.newDataLocation(
FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())));
EncryptedOutputFile encryptedOutputFile =
EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
import org.apache.iceberg.actions.SizeBasedFileRewriter;
Expand Down Expand Up @@ -345,6 +346,7 @@ public void testRemoveDanglingDeletes() throws Exception {
SparkActions.get(spark)
.rewriteDataFiles(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.execute();

Result result =
Expand Down Expand Up @@ -387,6 +389,7 @@ public void testSomePartitionsDanglingDeletes() throws Exception {
.rewriteDataFiles(table)
.filter(filter)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.execute();

Result result =
Expand Down Expand Up @@ -439,6 +442,7 @@ public void testRewriteFilterRemoveDangling() throws Exception {
SparkActions.get(spark)
.rewriteDataFiles(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.option(RewriteDataFiles.DELETE_DANGLING_DELETE, "false")
.execute();

Expression filter = Expressions.or(Expressions.equal("c1", 0), Expressions.equal("c1", 1));
Expand Down Expand Up @@ -797,6 +801,7 @@ private void writePosDeletesForFiles(
files.stream().collect(Collectors.groupingBy(ContentFile::partition));
List<DeleteFile> deleteFiles =
Lists.newArrayListWithCapacity(deleteFilesPerPartition * filesByPartition.size());
String suffix = String.format(".%s", FileFormat.PARQUET.name().toLowerCase());

for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry :
filesByPartition.entrySet()) {
Expand All @@ -821,7 +826,7 @@ private void writePosDeletesForFiles(
if (counter == deleteFileSize) {
// Dump to file and reset variables
OutputFile output =
Files.localOutput(File.createTempFile("junit", null, temp.toFile()));
Files.localOutput(File.createTempFile("junit", suffix, temp.toFile()));
deleteFiles.add(FileHelpers.writeDeleteFile(table, output, partition, deletes).first());
counter = 0;
deletes.clear();
Expand Down

0 comments on commit aca367c

Please sign in to comment.