Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Iceberg deletes when an entire file can be removed #14198

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileContent content;
private final Optional<String> referencedDataFile;
private final Optional<Long> fileRecordCount;
private final Optional<Long> deletedRowCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -42,7 +44,9 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content,
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile)
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile,
@JsonProperty("fileRecordCount") Optional<Long> fileRecordCount,
@JsonProperty("deletedRowCount") Optional<Long> deletedRowCount)
{
this.path = requireNonNull(path, "path is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
Expand All @@ -52,6 +56,11 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null");
this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null");
fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative"));
this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null");
deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative"));
checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

Expand Down Expand Up @@ -102,4 +111,16 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public Optional<Long> getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public Optional<Long> getDeletedRowCount()
{
return deletedRowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IcebergColumnHandle
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1;
public static final String TRINO_ROW_ID_NAME = "$row_id";

public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2;
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ public void storeMergedRows(Page page)

int index = position;
FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> {
int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(2), index));
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(3), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData);
long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index);
int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(3), index));
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData, fileRecordCount);
});

deletion.rowsToDelete().addLong(rowPosition);
Expand All @@ -129,7 +130,8 @@ public CompletableFuture<Collection<Slice>> finish()
ConnectorPageSink sink = createPositionDeletePageSink(
dataFilePath.toStringUtf8(),
partitionsSpecs.get(deletion.partitionSpecId()),
deletion.partitionDataJson());
deletion.partitionDataJson(),
deletion.fileRecordCount());

fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete()));
});
Expand All @@ -143,7 +145,7 @@ public void abort()
insertPageSink.abort();
}

private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson)
private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount)
{
Optional<PartitionData> partitionData = Optional.empty();
if (partitionSpec.isPartitioned()) {
Expand All @@ -163,7 +165,8 @@ private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, Part
jsonCodec,
session,
fileFormat,
storageProperties);
storageProperties,
fileRecordCount);
}

private static Collection<Slice> writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete)
Expand Down Expand Up @@ -201,12 +204,14 @@ private static class FileDeletion
{
private final int partitionSpecId;
private final String partitionDataJson;
private final long fileRecordCount;
private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();

public FileDeletion(int partitionSpecId, String partitionDataJson)
public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount)
{
this.partitionSpecId = partitionSpecId;
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileRecordCount = fileRecordCount;
}

public int partitionSpecId()
Expand All @@ -219,6 +224,11 @@ public String partitionDataJson()
return partitionDataJson;
}

public long fileRecordCount()
{
return fileRecordCount;
}

public LongBitmapDataProvider rowsToDelete()
{
return rowsToDelete;
Expand Down
Loading