Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Iceberg deletes when an entire file can be removed #12197

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileContent content;
private final Optional<String> referencedDataFile;
private final Optional<Long> fileRecordCount;
private final Optional<Long> deletedRowCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -42,7 +44,9 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content,
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile)
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile,
@JsonProperty("fileRecordCount") Optional<Long> fileRecordCount,
@JsonProperty("deletedRowCount") Optional<Long> deletedRowCount)
{
this.path = requireNonNull(path, "path is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
Expand All @@ -52,6 +56,11 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null");
this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null");
fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative"));
this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null");
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative"));
checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

Expand Down Expand Up @@ -102,4 +111,16 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public Optional<Long> getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public Optional<Long> getDeletedRowCount()
{
return deletedRowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
Expand Down Expand Up @@ -137,6 +139,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -192,7 +196,9 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.FileContent.POSITION_DELETES;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.versionHintLocation;
Expand Down Expand Up @@ -1400,77 +1406,118 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());

RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
if (!table.getEnforcedPredicate().isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(table.getEnforcedPredicate()));
}
Map<String, List<CommitTaskData>> deletesByFilePath = commitTasks.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finishWrite method is now ~ 150 lines long.
Please consider a refactoring of the method to smaller building blocks in order to ensure a good readability in the weeks/months to come.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow-up

.filter(task -> task.getContent() == POSITION_DELETES)
.collect(groupingBy(task -> task.getReferencedDataFile().orElseThrow()));
Map<String, List<CommitTaskData>> fullyDeletedFiles = deletesByFilePath
.entrySet().stream()
.filter(entry -> fileIsFullyDeleted(entry.getValue()))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

if (!deletesByFilePath.keySet().equals(fullyDeletedFiles.keySet()) || commitTasks.stream().anyMatch(task -> task.getContent() == FileContent.DATA)) {
RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
if (!table.getEnforcedPredicate().isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(table.getEnforcedPredicate()));
}

IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
}
IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
}

if (runUpdateValidations) {
// Ensure a row that is updated by this commit was not deleted by a separate commit
rowDelta.validateDeletedFiles();
rowDelta.validateNoConflictingDeleteFiles();
}
if (runUpdateValidations) {
// Ensure a row that is updated by this commit was not deleted by a separate commit
rowDelta.validateDeletedFiles();
rowDelta.validateNoConflictingDeleteFiles();
}

ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
if (fullyDeletedFiles.containsKey(task.getReferencedDataFile().orElseThrow())) {
continue;
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException("Unsupported task content: " + task.getContent());
}
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
rowDelta.commit();
}
catch (ValidationException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e);
}
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
if (!fullyDeletedFiles.isEmpty()) {
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getTableLocation()));
for (List<CommitTaskData> commitTasksToCleanUp : fullyDeletedFiles.values()) {
for (CommitTaskData commitTaskData : commitTasksToCleanUp) {
if (!fileSystem.delete(new Path(commitTaskData.getPath()), false)) {
log.warn("Failed to clean up uncommitted position delete file: %s", commitTaskData.getPath());
}
}
}
}
catch (IOException e) {
log.warn(e, "Failed to clean up uncommitted position delete files");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not propagate here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably because we create delete files for this few lines below?
Tough I'd expect propagation too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing to delete a file that is not going to be committed didn't seem like enough of a problem to warrant failing the query.

If we have limited fs permissions and can't delete files, for example, we would still be able to write deletes.

This would eventually get picked up by a remove_orphan_files collection

}
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
rowDelta.commit();
if (!fullyDeletedFiles.isEmpty()) {
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
DeleteFiles deleteFiles = transaction.newDelete();
fullyDeletedFiles.keySet().forEach(deleteFiles::deleteFile);
deleteFiles.commit();
}
transaction.commitTransaction();
}
catch (ValidationException e) {
Expand All @@ -1479,6 +1526,23 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
transaction = null;
}

private boolean fileIsFullyDeleted(List<CommitTaskData> positionDeletes)
{
checkArgument(!positionDeletes.isEmpty(), "Cannot call fileIsFullyDeletes with an empty list");
String referencedDataFile = positionDeletes.get(0).getReferencedDataFile().orElseThrow();
long fileRecordCount = positionDeletes.get(0).getFileRecordCount().orElseThrow();
checkArgument(positionDeletes.stream()
.allMatch(positionDelete -> positionDelete.getReferencedDataFile().orElseThrow().equals(referencedDataFile) &&
positionDelete.getFileRecordCount().orElseThrow() == fileRecordCount),
"All position deletes must be for the same file and have the same fileRecordCount");
long deletedRowCount = positionDeletes.stream()
.map(CommitTaskData::getDeletedRowCount)
.mapToLong(Optional::orElseThrow)
.sum();
checkState(deletedRowCount <= fileRecordCount, "Found more deleted rows than exist in the file");
return fileRecordCount == deletedRowCount;
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ private void closeWriter(WriteContext writeContext)
PartitionSpecParser.toJson(partitionSpec),
writeContext.getPartitionData().map(PartitionData::toJson),
DATA,
Optional.empty(),
Optional.empty(),
Optional.empty());

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ public ConnectorPageSource createPageSource(
jsonCodec,
session,
split.getFileFormat(),
table.getStorageProperties());
table.getStorageProperties(),
split.getFileRecordCount());

Supplier<IcebergPageSink> updatedRowPageSinkSupplier = () -> new IcebergPageSink(
tableSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IcebergSplit
private final long start;
private final long length;
private final long fileSize;
private final long fileRecordCount;
private final IcebergFileFormat fileFormat;
private final List<HostAddress> addresses;
private final String partitionSpecJson;
Expand All @@ -49,6 +50,7 @@ public IcebergSplit(
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileRecordCount") long fileRecordCount,
@JsonProperty("fileFormat") IcebergFileFormat fileFormat,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionSpecJson") String partitionSpecJson,
Expand All @@ -59,6 +61,7 @@ public IcebergSplit(
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileRecordCount = fileRecordCount;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null");
Expand Down Expand Up @@ -103,6 +106,12 @@ public long getFileSize()
return fileSize;
}

@JsonProperty
public long getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public IcebergFileFormat getFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ private static IcebergSplit toIcebergSplit(FileScanTask task)
task.start(),
task.length(),
task.file().fileSizeInBytes(),
task.file().recordCount(),
IcebergFileFormat.fromIceberg(task.file().format()),
ImmutableList.of(),
PartitionSpecParser.toJson(task.spec()),
Expand Down
Loading