Skip to content

Commit

Permalink
Do not use metadata deletes during Iceberg writes
Browse files Browse the repository at this point in the history
The DeleteFiles operation does not validate conflicts like RowDelta
does. In the event a file is removed during a write operation this can
result in a update operation being reflected as an insert.

As follow up this optimization could be put back using the
OverwriteFiles API.
  • Loading branch information
alexjo2144 authored and findepi committed Aug 9, 2023
1 parent 08a7d09 commit c6dfe27
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 235 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-blackhole</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class CommitTaskData
Expand All @@ -32,8 +31,6 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileContent content;
private final Optional<String> referencedDataFile;
private final Optional<Long> fileRecordCount;
private final Optional<Long> deletedRowCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -44,9 +41,7 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content,
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile,
@JsonProperty("fileRecordCount") Optional<Long> fileRecordCount,
@JsonProperty("deletedRowCount") Optional<Long> deletedRowCount)
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile)
{
this.path = requireNonNull(path, "path is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
Expand All @@ -56,12 +51,6 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null");
this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null");
fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative"));
this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null");
deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative"));
checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

@JsonProperty
Expand Down Expand Up @@ -111,16 +100,4 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public Optional<Long> getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public Optional<Long> getDeletedRowCount()
{
return deletedRowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ public class IcebergColumnHandle
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1;
public static final String TRINO_ROW_ID_NAME = "$row_id";

public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2;
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4;
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 3;

private final ColumnIdentity baseColumnIdentity;
private final Type baseType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,9 @@ public void storeMergedRows(Page page)

int index = position;
FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> {
long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index);
int partitionSpecId = INTEGER.getInt(rowIdRow.getField(3), index);
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData, fileRecordCount);
int partitionSpecId = INTEGER.getInt(rowIdRow.getField(2), index);
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(3), index).toStringUtf8();
return new FileDeletion(partitionSpecId, partitionData);
});

deletion.rowsToDelete().addLong(rowPosition);
Expand All @@ -129,8 +128,7 @@ public CompletableFuture<Collection<Slice>> finish()
ConnectorPageSink sink = createPositionDeletePageSink(
dataFilePath.toStringUtf8(),
partitionsSpecs.get(deletion.partitionSpecId()),
deletion.partitionDataJson(),
deletion.fileRecordCount());
deletion.partitionDataJson());

fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete()));
});
Expand All @@ -144,7 +142,7 @@ public void abort()
insertPageSink.abort();
}

private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount)
private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson)
{
Optional<PartitionData> partitionData = Optional.empty();
if (partitionSpec.isPartitioned()) {
Expand All @@ -164,8 +162,7 @@ private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, Part
jsonCodec,
session,
fileFormat,
storageProperties,
fileRecordCount);
storageProperties);
}

private static Collection<Slice> writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete)
Expand Down Expand Up @@ -203,14 +200,12 @@ private static class FileDeletion
{
private final int partitionSpecId;
private final String partitionDataJson;
private final long fileRecordCount;
private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();

public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount)
public FileDeletion(int partitionSpecId, String partitionDataJson)
{
this.partitionSpecId = partitionSpecId;
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileRecordCount = fileRecordCount;
}

public int partitionSpecId()
Expand All @@ -223,11 +218,6 @@ public String partitionDataJson()
return partitionDataJson;
}

public long fileRecordCount()
{
return fileRecordCount;
}

public LongBitmapDataProvider rowsToDelete()
{
return rowsToDelete;
Expand Down
Loading

0 comments on commit c6dfe27

Please sign in to comment.