diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java index 31d7c9ee9277..f1585499e67d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java @@ -32,6 +32,8 @@ public class CommitTaskData private final Optional partitionDataJson; private final FileContent content; private final Optional referencedDataFile; + private final Optional fileRecordCount; + private final Optional deletedRowCount; @JsonCreator public CommitTaskData( @@ -42,7 +44,9 @@ public CommitTaskData( @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") Optional partitionDataJson, @JsonProperty("content") FileContent content, - @JsonProperty("referencedDataFile") Optional referencedDataFile) + @JsonProperty("referencedDataFile") Optional referencedDataFile, + @JsonProperty("fileRecordCount") Optional fileRecordCount, + @JsonProperty("deletedRowCount") Optional deletedRowCount) { this.path = requireNonNull(path, "path is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); @@ -52,6 +56,11 @@ public CommitTaskData( this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); this.content = requireNonNull(content, "content is null"); this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null"); + this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null"); + fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative")); + this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null"); + deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative")); + checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together"); checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative"); } @@ -102,4 +111,16 @@ public Optional getReferencedDataFile() { return referencedDataFile; } + + @JsonProperty + public Optional getFileRecordCount() + { + return fileRecordCount; + } + + @JsonProperty + public Optional getDeletedRowCount() + { + return deletedRowCount; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 50c6e5c7a3b0..07b803c8c7cb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -40,6 +40,7 @@ public class IcebergColumnHandle public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1; public static final String TRINO_ROW_ID_NAME = "$row_id"; + public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2; public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3; public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java index b9451cfaf8d8..511c032fe15b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java @@ -110,9 +110,10 @@ public void storeMergedRows(Page page) int index = position; FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> { - int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(2), index)); - String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(3), index).toStringUtf8(); - return new FileDeletion(partitionSpecId, partitionData); + long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index); + int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(3), index)); + String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8(); + return new FileDeletion(partitionSpecId, partitionData, fileRecordCount); }); deletion.rowsToDelete().addLong(rowPosition); @@ -129,7 +130,8 @@ public CompletableFuture> finish() ConnectorPageSink sink = createPositionDeletePageSink( dataFilePath.toStringUtf8(), partitionsSpecs.get(deletion.partitionSpecId()), - deletion.partitionDataJson()); + deletion.partitionDataJson(), + deletion.fileRecordCount()); fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete())); }); @@ -143,7 +145,7 @@ public void abort() insertPageSink.abort(); } - private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson) + private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount) { Optional partitionData = Optional.empty(); if (partitionSpec.isPartitioned()) { @@ -163,7 +165,8 @@ private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, Part jsonCodec, session, fileFormat, - storageProperties); + storageProperties, + fileRecordCount); } private static Collection writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete) @@ -201,12 +204,14 @@ private static class FileDeletion { private final int partitionSpecId; private final String partitionDataJson; + private final long fileRecordCount; private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap(); - public FileDeletion(int partitionSpecId, String partitionDataJson) + public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount) { this.partitionSpecId = partitionSpecId; this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); + this.fileRecordCount = fileRecordCount; } public int partitionSpecId() @@ -219,6 +224,11 @@ public String partitionDataJson() return partitionDataJson; } + public long fileRecordCount() + { + return fileRecordCount; + } + public LongBitmapDataProvider rowsToDelete() { return rowsToDelete; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 70fec7ee7f55..ea8cb385fb80 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -98,6 +98,8 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; @@ -125,6 +127,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StringType; import org.apache.iceberg.types.Types.StructType; @@ -152,6 +155,7 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -170,6 +174,7 @@ import static io.trino.plugin.iceberg.ConstraintExtractor.extractTupleDomain; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergAnalyzeProperties.getColumnNames; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_ROW_ID; @@ -231,7 +236,9 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; +import static org.apache.iceberg.FileContent.POSITION_DELETES; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.versionHintLocation; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; @@ -1637,6 +1644,7 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto StructType type = StructType.of(ImmutableList.builder() .add(MetadataColumns.FILE_PATH) .add(MetadataColumns.ROW_POSITION) + .add(NestedField.required(TRINO_MERGE_FILE_RECORD_COUNT, "file_record_count", LongType.get())) .add(NestedField.required(TRINO_MERGE_PARTITION_SPEC_ID, "partition_spec_id", IntegerType.get())) .add(NestedField.required(TRINO_MERGE_PARTITION_DATA, "partition_data", StringType.get())) .build()); @@ -1705,78 +1713,116 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); - RowDelta rowDelta = transaction.newRowDelta(); - table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); - TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); - if (!dataColumnPredicate.isAll()) { - rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate)); - } + Map> deletesByFilePath = commitTasks.stream() + .filter(task -> task.getContent() == POSITION_DELETES) + .collect(groupingBy(task -> task.getReferencedDataFile().orElseThrow())); + Map> fullyDeletedFiles = deletesByFilePath + .entrySet().stream() + .filter(entry -> fileIsFullyDeleted(entry.getValue())) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (!deletesByFilePath.keySet().equals(fullyDeletedFiles.keySet()) || commitTasks.stream().anyMatch(task -> task.getContent() == FileContent.DATA)) { + RowDelta rowDelta = transaction.newRowDelta(); + table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); + TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); + if (!dataColumnPredicate.isAll()) { + rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate)); + } + IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); + if (isolationLevel == IsolationLevel.SERIALIZABLE) { + rowDelta.validateNoConflictingDataFiles(); + } - IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); - if (isolationLevel == IsolationLevel.SERIALIZABLE) { - rowDelta.validateNoConflictingDataFiles(); - } + if (runUpdateValidations) { + // Ensure a row that is updated by this commit was not deleted by a separate commit + rowDelta.validateDeletedFiles(); + rowDelta.validateNoConflictingDeleteFiles(); + } - if (runUpdateValidations) { - // Ensure a row that is updated by this commit was not deleted by a separate commit - rowDelta.validateDeletedFiles(); - rowDelta.validateNoConflictingDeleteFiles(); - } + ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); + ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); + for (CommitTaskData task : commitTasks) { + PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson()); + Type[] partitionColumnTypes = partitionSpec.fields().stream() + .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) + .toArray(Type[]::new); + switch (task.getContent()) { + case POSITION_DELETES: + if (fullyDeletedFiles.containsKey(task.getReferencedDataFile().orElseThrow())) { + continue; + } - ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); - ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); - for (CommitTaskData task : commitTasks) { - PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson()); - Type[] partitionColumnTypes = partitionSpec.fields().stream() - .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) - .toArray(Type[]::new); - switch (task.getContent()) { - case POSITION_DELETES: - FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) - .withPath(task.getPath()) - .withFormat(task.getFileFormat().toIceberg()) - .ofPositionDeletes() - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withMetrics(task.getMetrics().metrics()); - - if (!partitionSpec.fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); - } + FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .ofPositionDeletes() + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!partitionSpec.fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } - rowDelta.addDeletes(deleteBuilder.build()); - writtenFiles.add(task.getPath()); - task.getReferencedDataFile().ifPresent(referencedDataFiles::add); - break; - case DATA: - DataFiles.Builder builder = DataFiles.builder(partitionSpec) - .withPath(task.getPath()) - .withFormat(task.getFileFormat().toIceberg()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withMetrics(task.getMetrics().metrics()); - - if (!icebergTable.spec().fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); - } - rowDelta.addRows(builder.build()); - writtenFiles.add(task.getPath()); - break; - default: - throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); + rowDelta.addDeletes(deleteBuilder.build()); + writtenFiles.add(task.getPath()); + task.getReferencedDataFile().ifPresent(referencedDataFiles::add); + break; + case DATA: + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + rowDelta.addRows(builder.build()); + writtenFiles.add(task.getPath()); + break; + default: + throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); + } + } + + // try to leave as little garbage as possible behind + if (table.getRetryMode() != NO_RETRIES) { + cleanExtraOutputFiles(session, writtenFiles.build()); + } + + rowDelta.validateDataFilesExist(referencedDataFiles.build()); + try { + rowDelta.commit(); + } + catch (ValidationException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e); } } - // try to leave as little garbage as possible behind - if (table.getRetryMode() != NO_RETRIES) { - cleanExtraOutputFiles(session, writtenFiles.build()); + if (!fullyDeletedFiles.isEmpty()) { + try { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + for (List commitTasksToCleanUp : fullyDeletedFiles.values()) { + for (CommitTaskData commitTaskData : commitTasksToCleanUp) { + fileSystem.deleteFile(commitTaskData.getPath()); + } + } + } + catch (IOException e) { + log.warn(e, "Failed to clean up uncommitted position delete files"); + } } - rowDelta.validateDataFilesExist(referencedDataFiles.build()); try { - rowDelta.commit(); + if (!fullyDeletedFiles.isEmpty()) { + DeleteFiles deleteFiles = transaction.newDelete(); + fullyDeletedFiles.keySet().forEach(deleteFiles::deleteFile); + deleteFiles.commit(); + } transaction.commitTransaction(); } catch (ValidationException e) { @@ -1785,6 +1831,23 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col transaction = null; } + private static boolean fileIsFullyDeleted(List positionDeletes) + { + checkArgument(!positionDeletes.isEmpty(), "Cannot call fileIsFullyDeletes with an empty list"); + String referencedDataFile = positionDeletes.get(0).getReferencedDataFile().orElseThrow(); + long fileRecordCount = positionDeletes.get(0).getFileRecordCount().orElseThrow(); + checkArgument(positionDeletes.stream().allMatch(positionDelete -> + positionDelete.getReferencedDataFile().orElseThrow().equals(referencedDataFile) && + positionDelete.getFileRecordCount().orElseThrow() == fileRecordCount), + "All position deletes must be for the same file and have the same fileRecordCount"); + long deletedRowCount = positionDeletes.stream() + .map(CommitTaskData::getDeletedRowCount) + .mapToLong(Optional::orElseThrow) + .sum(); + checkState(deletedRowCount <= fileRecordCount, "Found more deleted rows than exist in the file"); + return fileRecordCount == deletedRowCount; + } + @Override public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 9499b7ccca9b..6ac967b91b00 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -312,6 +312,8 @@ private void closeWriter(WriteContext writeContext) PartitionSpecParser.toJson(partitionSpec), writeContext.getPartitionData().map(PartitionData::toJson), DATA, + Optional.empty(), + Optional.empty(), Optional.empty()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 4ab1227a7cf3..bae25dcf0547 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -141,6 +141,7 @@ import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.parquet.reader.ParquetReaderColumn.getParquetReaderFields; +import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; @@ -280,6 +281,9 @@ else if (identity.getId() == MetadataColumns.FILE_PATH.fieldId()) { else if (identity.getId() == ROW_POSITION.fieldId()) { requiredColumns.add(new IcebergColumnHandle(identity, BIGINT, ImmutableList.of(), BIGINT, Optional.empty())); } + else if (identity.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { + requiredColumns.add(new IcebergColumnHandle(identity, BIGINT, ImmutableList.of(), BIGINT, Optional.empty())); + } else if (identity.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { requiredColumns.add(new IcebergColumnHandle(identity, INTEGER, ImmutableList.of(), INTEGER, Optional.empty())); } @@ -310,6 +314,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { inputfile, split.getStart(), split.getLength(), + split.getFileRecordCount(), partitionSpec.specId(), split.getPartitionDataJson(), split.getFileFormat(), @@ -356,7 +361,8 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { jsonCodec, session, split.getFileFormat(), - table.getStorageProperties()); + table.getStorageProperties(), + split.getFileRecordCount()); Supplier updatedRowPageSinkSupplier = () -> new IcebergPageSink( tableSchema, @@ -493,6 +499,7 @@ private ConnectorPageSource openDeletes( fileSystem.newInputFile(delete.path(), delete.fileSizeInBytes()), 0, delete.fileSizeInBytes(), + delete.recordCount(), 0, "", IcebergFileFormat.fromIceberg(delete.format()), @@ -511,6 +518,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( TrinoInputFile inputFile, long start, long length, + long fileRecordCount, int partitionSpecId, String partitionData, IcebergFileFormat fileFormat, @@ -526,6 +534,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, + fileRecordCount, partitionSpecId, partitionData, dataColumns, @@ -548,6 +557,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, + fileRecordCount, partitionSpecId, partitionData, dataColumns, @@ -563,6 +573,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, + fileRecordCount, partitionSpecId, partitionData, fileSchema, @@ -577,6 +588,7 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource( TrinoInputFile inputFile, long start, long length, + long fileRecordCount, int partitionSpecId, String partitionData, List columns, @@ -648,6 +660,9 @@ else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { else if (column.isRowPositionColumn()) { columnAdaptations.add(ColumnAdaptation.positionColumn()); } + else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { + columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), fileRecordCount))); + } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId))); } @@ -916,6 +931,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( TrinoInputFile inputFile, long start, long length, + long fileRecordCount, int partitionSpecId, String partitionData, List regularColumns, @@ -1010,6 +1026,9 @@ else if (column.isRowPositionColumn()) { constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel); parquetSourceChannel++; } + else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { + constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); + } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); } @@ -1088,6 +1107,7 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource( TrinoInputFile inputFile, long start, long length, + long fileRecordCount, int partitionSpecId, String partitionData, Schema fileSchema, @@ -1149,6 +1169,9 @@ else if (column.isRowPositionColumn()) { constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel); avroSourceChannel++; } + else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { + constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); + } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index ac9dc0612440..81cd45d4bf9a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -39,6 +39,7 @@ public class IcebergSplit private final long start; private final long length; private final long fileSize; + private final long fileRecordCount; private final IcebergFileFormat fileFormat; private final List addresses; private final String partitionSpecJson; @@ -52,6 +53,7 @@ public IcebergSplit( @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, + @JsonProperty("fileRecordCount") long fileRecordCount, @JsonProperty("fileFormat") IcebergFileFormat fileFormat, @JsonProperty("addresses") List addresses, @JsonProperty("partitionSpecJson") String partitionSpecJson, @@ -63,6 +65,7 @@ public IcebergSplit( this.start = start; this.length = length; this.fileSize = fileSize; + this.fileRecordCount = fileRecordCount; this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); @@ -108,6 +111,12 @@ public long getFileSize() return fileSize; } + @JsonProperty + public long getFileRecordCount() + { + return fileRecordCount; + } + @JsonProperty public IcebergFileFormat getFileFormat() { @@ -167,9 +176,12 @@ public String toString() ToStringHelper helper = toStringHelper(this) .addValue(path) .add("start", start) - .add("length", length); + .add("length", length) + .add("records", fileRecordCount); if (!deletes.isEmpty()) { helper.add("deleteFiles", deletes.size()); + helper.add("deleteRecords", deletes.stream() + .mapToLong(DeleteFile::recordCount).sum()); } return helper.toString(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 7e8957862ef0..037c19d39ea8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -427,6 +427,7 @@ private IcebergSplit toIcebergSplit(FileScanTask task) task.start(), task.length(), task.file().fileSizeInBytes(), + task.file().recordCount(), IcebergFileFormat.fromIceberg(task.file().format()), ImmutableList.of(), PartitionSpecParser.toJson(task.spec()), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java index feb28ac3afff..209e147c3fa6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java @@ -41,6 +41,7 @@ public final class DeleteFile private final FileContent content; private final String path; private final FileFormat format; + private final long recordCount; private final long fileSizeInBytes; private final List equalityFieldIds; private final Map lowerBounds; @@ -57,6 +58,7 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) deleteFile.content(), deleteFile.path().toString(), deleteFile.format(), + deleteFile.recordCount(), deleteFile.fileSizeInBytes(), Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), lowerBounds, @@ -68,6 +70,7 @@ public DeleteFile( FileContent content, String path, FileFormat format, + long recordCount, long fileSizeInBytes, List equalityFieldIds, Map lowerBounds, @@ -76,6 +79,7 @@ public DeleteFile( this.content = requireNonNull(content, "content is null"); this.path = requireNonNull(path, "path is null"); this.format = requireNonNull(format, "format is null"); + this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null")); @@ -100,6 +104,12 @@ public FileFormat format() return format; } + @JsonProperty + public long recordCount() + { + return recordCount; + } + @JsonProperty public long fileSizeInBytes() { @@ -138,6 +148,7 @@ public String toString() { return toStringHelper(this) .addValue(path) + .add("records", recordCount) .toString(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java index 2063b9a6b101..b9a16d602218 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java @@ -57,9 +57,11 @@ public class IcebergPositionDeletePageSink private final JsonCodec jsonCodec; private final IcebergFileWriter writer; private final IcebergFileFormat fileFormat; + private final long fileRecordCount; private long validationCpuNanos; private boolean writtenData; + private long deletedRowCount; public IcebergPositionDeletePageSink( String dataFilePath, @@ -71,13 +73,15 @@ public IcebergPositionDeletePageSink( JsonCodec jsonCodec, ConnectorSession session, IcebergFileFormat fileFormat, - Map storageProperties) + Map storageProperties, + long fileRecordCount) { this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null"); this.partition = requireNonNull(partition, "partition is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.fileRecordCount = fileRecordCount; // prepend query id to a file name so we can determine which files were written by which query. This is needed for opportunistic cleanup of extra files // which may be present for successfully completing query in presence of failure recovery mechanisms. String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID()); @@ -116,6 +120,7 @@ public CompletableFuture appendPage(Page page) writer.appendRows(new Page(blocks)); writtenData = true; + deletedRowCount += page.getPositionCount(); return NOT_BLOCKED; } @@ -133,7 +138,9 @@ public CompletableFuture> finish() PartitionSpecParser.toJson(partitionSpec), partition.map(PartitionData::toJson), FileContent.POSITION_DELETES, - Optional.of(dataFilePath)); + Optional.of(dataFilePath), + Optional.of(fileRecordCount), + Optional.of(deletedRowCount)); Long recordCount = task.getMetrics().recordCount(); if (recordCount != null && recordCount > 0) { commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 7657105bebb0..b8f069570537 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1555,7 +1555,6 @@ public void testPartitionPredicatePushdownWithHistoricalPartitionSpecs() // The old partition scheme is no longer used so pushdown using the hour transform is allowed assertUpdate("DELETE FROM " + tableName + " WHERE year(d) = 1969", 3); - assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize"); assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); assertThat(query(selectQuery)) .containsAll("VALUES 1, 8, 9, 10") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAnalyze.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAnalyze.java index 9102042c8ff1..d9776d33fb80 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAnalyze.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAnalyze.java @@ -145,8 +145,8 @@ public void testAnalyzeWithSchemaEvolution() ('nationkey', null, 25, 0, null, '0', '24'), ('regionkey', null, 5, 0, null, '0', '4'), ('name', null, 25, 0, null, null, null), - ('info', null, null, null, null, null, null), - (null, null, null, null, 50, null, null)"""); + ('info', null, null, 0, null, null, null), + (null, null, null, null, 25, null, null)"""); assertUpdate("ANALYZE " + tableName); assertQuery( @@ -156,8 +156,8 @@ public void testAnalyzeWithSchemaEvolution() ('nationkey', null, 25, 0, null, '0', '24'), ('regionkey', null, 5, 0, null, '0', '4'), ('name', null, 25, 0, null, null, null), - ('info', null, 25, null, null, null, null), - (null, null, null, null, 50, null, null)"""); + ('info', null, 25, 0, null, null, null), + (null, null, null, null, 25, null, null)"""); assertUpdate("DROP TABLE " + tableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index d16648c262e5..d0af279f6382 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -157,6 +157,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle 0, outputFile.length(), outputFile.length(), + 0, // This is incorrect, but the value is only used for delete operations ORC, ImmutableList.of(), PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 0845a19a8de2..3ed9e86fa258 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -383,7 +383,7 @@ public void testDeletingEntireFile() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey <= 2", "SELECT count(*) FROM nation WHERE regionkey <= 2"); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey > 2"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); } @Test @@ -397,7 +397,7 @@ public void testDeletingEntireFileFromPartitionedTable() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(4); assertUpdate("DELETE FROM " + tableName + " WHERE b % 2 = 0", 6); assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1), (1, 3), (1, 5), (2, 1), (2, 3), (2, 5)"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(4); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); } @Test @@ -411,7 +411,7 @@ public void testDeletingEntireFileWithNonTupleDomainConstraint() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey % 2 = 1", "SELECT count(*) FROM nation WHERE regionkey % 2 = 1"); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey % 2 = 0"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); } @Test @@ -431,7 +431,7 @@ public void testDeletingEntireFileWithMultipleSplits() long parentSnapshotId = (long) computeScalar("SELECT parent_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); assertEquals(initialSnapshotId, parentSnapshotId); assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult(); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(0); } @Test