From c6dfe27cee76a7e9bf2c286ae37adf28fffe1bd9 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Thu, 3 Aug 2023 16:02:39 -0400 Subject: [PATCH] Do not use metadata deletes during Iceberg writes The DeleteFiles operation does not validate conflicts like RowDelta does. In the event a file is removed during a write operation this can result in a update operation being reflected as an insert. As follow up this optimization could be put back using the OverwriteFiles API. --- plugin/trino-iceberg/pom.xml | 6 + .../trino/plugin/iceberg/CommitTaskData.java | 25 +-- .../plugin/iceberg/IcebergColumnHandle.java | 5 +- .../plugin/iceberg/IcebergMergeSink.java | 24 +-- .../trino/plugin/iceberg/IcebergMetadata.java | 173 ++++++------------ .../trino/plugin/iceberg/IcebergPageSink.java | 2 - .../iceberg/IcebergPageSourceProvider.java | 22 --- .../io/trino/plugin/iceberg/IcebergSplit.java | 12 +- .../plugin/iceberg/IcebergSplitSource.java | 1 - .../delete/IcebergPositionDeletePageSink.java | 11 +- .../BaseIcebergConnectorSmokeTest.java | 36 ++-- .../iceberg/BaseIcebergConnectorTest.java | 2 + ...stIcebergNodeLocalDynamicSplitPruning.java | 1 - .../plugin/iceberg/TestIcebergStatistics.java | 22 +-- .../trino/plugin/iceberg/TestIcebergV2.java | 125 ++++++++++++- 15 files changed, 232 insertions(+), 235 deletions(-) diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 34713e9f881f..3b7ae52e7d7c 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -444,6 +444,12 @@ test + + io.trino + trino-blackhole + test + + io.trino trino-exchange-filesystem diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java index f1585499e67d..9870f0b03502 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java @@ -19,7 +19,6 @@ import java.util.Optional; -import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; public class CommitTaskData @@ -32,8 +31,6 @@ public class CommitTaskData private final Optional partitionDataJson; private final FileContent content; private final Optional referencedDataFile; - private final Optional fileRecordCount; - private final Optional deletedRowCount; @JsonCreator public CommitTaskData( @@ -44,9 +41,7 @@ public CommitTaskData( @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") Optional partitionDataJson, @JsonProperty("content") FileContent content, - @JsonProperty("referencedDataFile") Optional referencedDataFile, - @JsonProperty("fileRecordCount") Optional fileRecordCount, - @JsonProperty("deletedRowCount") Optional deletedRowCount) + @JsonProperty("referencedDataFile") Optional referencedDataFile) { this.path = requireNonNull(path, "path is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); @@ -56,12 +51,6 @@ public CommitTaskData( this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); this.content = requireNonNull(content, "content is null"); this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null"); - this.fileRecordCount = requireNonNull(fileRecordCount, "fileRecordCount is null"); - fileRecordCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "fileRecordCount cannot be negative")); - this.deletedRowCount = requireNonNull(deletedRowCount, "deletedRowCount is null"); - deletedRowCount.ifPresent(rowCount -> checkArgument(rowCount >= 0, "deletedRowCount cannot be negative")); - checkArgument(fileRecordCount.isPresent() == deletedRowCount.isPresent(), "fileRecordCount and deletedRowCount must be specified together"); - checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative"); } @JsonProperty @@ -111,16 +100,4 @@ public Optional getReferencedDataFile() { return referencedDataFile; } - - @JsonProperty - public Optional getFileRecordCount() - { - return fileRecordCount; - } - - @JsonProperty - public Optional getDeletedRowCount() - { - return deletedRowCount; - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index 07b803c8c7cb..ebb49d12a00f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -40,9 +40,8 @@ public class IcebergColumnHandle public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1; public static final String TRINO_ROW_ID_NAME = "$row_id"; - public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2; - public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3; - public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4; + public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2; + public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 3; private final ColumnIdentity baseColumnIdentity; private final Type baseType; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java index c6465b439f3b..e51f334e71d3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java @@ -109,10 +109,9 @@ public void storeMergedRows(Page page) int index = position; FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> { - long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index); - int partitionSpecId = INTEGER.getInt(rowIdRow.getField(3), index); - String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8(); - return new FileDeletion(partitionSpecId, partitionData, fileRecordCount); + int partitionSpecId = INTEGER.getInt(rowIdRow.getField(2), index); + String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(3), index).toStringUtf8(); + return new FileDeletion(partitionSpecId, partitionData); }); deletion.rowsToDelete().addLong(rowPosition); @@ -129,8 +128,7 @@ public CompletableFuture> finish() ConnectorPageSink sink = createPositionDeletePageSink( dataFilePath.toStringUtf8(), partitionsSpecs.get(deletion.partitionSpecId()), - deletion.partitionDataJson(), - deletion.fileRecordCount()); + deletion.partitionDataJson()); fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete())); }); @@ -144,7 +142,7 @@ public void abort() insertPageSink.abort(); } - private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount) + private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson) { Optional partitionData = Optional.empty(); if (partitionSpec.isPartitioned()) { @@ -164,8 +162,7 @@ private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, Part jsonCodec, session, fileFormat, - storageProperties, - fileRecordCount); + storageProperties); } private static Collection writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete) @@ -203,14 +200,12 @@ private static class FileDeletion { private final int partitionSpecId; private final String partitionDataJson; - private final long fileRecordCount; private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap(); - public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount) + public FileDeletion(int partitionSpecId, String partitionDataJson) { this.partitionSpecId = partitionSpecId; this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); - this.fileRecordCount = fileRecordCount; } public int partitionSpecId() @@ -223,11 +218,6 @@ public String partitionDataJson() return partitionDataJson; } - public long fileRecordCount() - { - return fileRecordCount; - } - public LongBitmapDataProvider rowsToDelete() { return rowsToDelete; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 277f9b2a0b21..cd81cc8a2ed5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -109,7 +109,6 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; -import org.apache.iceberg.FileContent; import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; @@ -144,7 +143,6 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; -import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StringType; import org.apache.iceberg.types.Types.StructType; @@ -177,7 +175,6 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -193,7 +190,6 @@ import static io.trino.plugin.iceberg.ConstraintExtractor.extractTupleDomain; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergAnalyzeProperties.getColumnNames; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_ROW_ID; @@ -269,9 +265,7 @@ import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; -import static org.apache.iceberg.FileContent.POSITION_DELETES; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.versionHintLocation; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; @@ -2110,7 +2104,6 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto StructType type = StructType.of(ImmutableList.builder() .add(MetadataColumns.FILE_PATH) .add(MetadataColumns.ROW_POSITION) - .add(NestedField.required(TRINO_MERGE_FILE_RECORD_COUNT, "file_record_count", LongType.get())) .add(NestedField.required(TRINO_MERGE_PARTITION_SPEC_ID, "partition_spec_id", IntegerType.get())) .add(NestedField.required(TRINO_MERGE_PARTITION_DATA, "partition_data", StringType.get())) .build()); @@ -2190,132 +2183,76 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); - Map> deletesByFilePath = commitTasks.stream() - .filter(task -> task.getContent() == POSITION_DELETES) - .collect(groupingBy(task -> task.getReferencedDataFile().orElseThrow())); - Map> fullyDeletedFiles = deletesByFilePath - .entrySet().stream() - .filter(entry -> fileIsFullyDeleted(entry.getValue())) - .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); - - if (!deletesByFilePath.keySet().equals(fullyDeletedFiles.keySet()) || commitTasks.stream().anyMatch(task -> task.getContent() == FileContent.DATA)) { - RowDelta rowDelta = transaction.newRowDelta(); - table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); - TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); - if (!dataColumnPredicate.isAll()) { - rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate)); - } - IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); - if (isolationLevel == IsolationLevel.SERIALIZABLE) { - rowDelta.validateNoConflictingDataFiles(); - } + RowDelta rowDelta = transaction.newRowDelta(); + table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); + TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); + if (!dataColumnPredicate.isAll()) { + rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate)); + } + IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); + if (isolationLevel == IsolationLevel.SERIALIZABLE) { + rowDelta.validateNoConflictingDataFiles(); + } - // Ensure a row that is updated by this commit was not deleted by a separate commit - rowDelta.validateDeletedFiles(); - rowDelta.validateNoConflictingDeleteFiles(); - - ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); - ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); - for (CommitTaskData task : commitTasks) { - PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson()); - Type[] partitionColumnTypes = partitionSpec.fields().stream() - .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) - .toArray(Type[]::new); - switch (task.getContent()) { - case POSITION_DELETES -> { - if (fullyDeletedFiles.containsKey(task.getReferencedDataFile().orElseThrow())) { - continue; - } - FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) - .withPath(task.getPath()) - .withFormat(task.getFileFormat().toIceberg()) - .ofPositionDeletes() - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withMetrics(task.getMetrics().metrics()); - if (!partitionSpec.fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); - } - rowDelta.addDeletes(deleteBuilder.build()); - writtenFiles.add(task.getPath()); - task.getReferencedDataFile().ifPresent(referencedDataFiles::add); + // Ensure a row that is updated by this commit was not deleted by a separate commit + rowDelta.validateDeletedFiles(); + rowDelta.validateNoConflictingDeleteFiles(); + + ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); + ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); + for (CommitTaskData task : commitTasks) { + PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson()); + Type[] partitionColumnTypes = partitionSpec.fields().stream() + .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) + .toArray(Type[]::new); + switch (task.getContent()) { + case POSITION_DELETES -> { + FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .ofPositionDeletes() + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + if (!partitionSpec.fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); } - case DATA -> { - DataFiles.Builder builder = DataFiles.builder(partitionSpec) - .withPath(task.getPath()) - .withFormat(task.getFileFormat().toIceberg()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withMetrics(task.getMetrics().metrics()); - if (!icebergTable.spec().fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); - } - rowDelta.addRows(builder.build()); - writtenFiles.add(task.getPath()); + rowDelta.addDeletes(deleteBuilder.build()); + writtenFiles.add(task.getPath()); + task.getReferencedDataFile().ifPresent(referencedDataFiles::add); + } + case DATA -> { + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); } - default -> throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); + rowDelta.addRows(builder.build()); + writtenFiles.add(task.getPath()); } - } - - // try to leave as little garbage as possible behind - if (retryMode != NO_RETRIES) { - cleanExtraOutputFiles(session, writtenFiles.build()); - } - - rowDelta.validateDataFilesExist(referencedDataFiles.build()); - try { - commit(rowDelta, session); - } - catch (ValidationException e) { - throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e); + default -> throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); } } - if (!fullyDeletedFiles.isEmpty()) { - try { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - fileSystem.deleteFiles(fullyDeletedFiles.values().stream() - .flatMap(Collection::stream) - .map(CommitTaskData::getPath) - .map(Location::of) - .collect(toImmutableSet())); - } - catch (IOException e) { - log.warn(e, "Failed to clean up uncommitted position delete files"); - } + // try to leave as little garbage as possible behind + if (retryMode != NO_RETRIES) { + cleanExtraOutputFiles(session, writtenFiles.build()); } + rowDelta.validateDataFilesExist(referencedDataFiles.build()); try { - if (!fullyDeletedFiles.isEmpty()) { - DeleteFiles deleteFiles = transaction.newDelete(); - fullyDeletedFiles.keySet().forEach(deleteFiles::deleteFile); - commit(deleteFiles, session); - } + commit(rowDelta, session); transaction.commitTransaction(); } catch (ValidationException e) { throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e); } - transaction = null; - } - - private static boolean fileIsFullyDeleted(List positionDeletes) - { - checkArgument(!positionDeletes.isEmpty(), "Cannot call fileIsFullyDeletes with an empty list"); - String referencedDataFile = positionDeletes.get(0).getReferencedDataFile().orElseThrow(); - long fileRecordCount = positionDeletes.get(0).getFileRecordCount().orElseThrow(); - checkArgument(positionDeletes.stream().allMatch(positionDelete -> - positionDelete.getReferencedDataFile().orElseThrow().equals(referencedDataFile) && - positionDelete.getFileRecordCount().orElseThrow() == fileRecordCount), - "All position deletes must be for the same file and have the same fileRecordCount"); - long deletedRowCount = positionDeletes.stream() - .map(CommitTaskData::getDeletedRowCount) - .mapToLong(Optional::orElseThrow) - .sum(); - checkState(deletedRowCount <= fileRecordCount, "Found more deleted rows than exist in the file"); - return fileRecordCount == deletedRowCount; } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index d26afe2d9b2e..b54d8f25fa9b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -398,8 +398,6 @@ private void closeWriter(int writerIndex) PartitionSpecParser.toJson(partitionSpec), writeContext.getPartitionData().map(PartitionData::toJson), DATA, - Optional.empty(), - Optional.empty(), Optional.empty()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 35e8a39c09cd..259ea8d5db6d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -141,7 +141,6 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; @@ -271,9 +270,6 @@ else if (identity.getId() == MetadataColumns.FILE_PATH.fieldId()) { else if (identity.getId() == ROW_POSITION.fieldId()) { requiredColumns.add(new IcebergColumnHandle(identity, BIGINT, ImmutableList.of(), BIGINT, Optional.empty())); } - else if (identity.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { - requiredColumns.add(new IcebergColumnHandle(identity, BIGINT, ImmutableList.of(), BIGINT, Optional.empty())); - } else if (identity.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { requiredColumns.add(new IcebergColumnHandle(identity, INTEGER, ImmutableList.of(), INTEGER, Optional.empty())); } @@ -303,7 +299,6 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { inputfile, split.getStart(), split.getLength(), - split.getFileRecordCount(), partitionSpec.specId(), split.getPartitionDataJson(), split.getFileFormat(), @@ -449,7 +444,6 @@ private ConnectorPageSource openDeletes( fileSystem.newInputFile(Location.of(delete.path()), delete.fileSizeInBytes()), 0, delete.fileSizeInBytes(), - delete.recordCount(), 0, "", IcebergFileFormat.fromIceberg(delete.format()), @@ -467,7 +461,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( TrinoInputFile inputFile, long start, long length, - long fileRecordCount, int partitionSpecId, String partitionData, IcebergFileFormat fileFormat, @@ -483,7 +476,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, - fileRecordCount, partitionSpecId, partitionData, dataColumns, @@ -506,7 +498,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, - fileRecordCount, partitionSpecId, partitionData, dataColumns, @@ -525,7 +516,6 @@ public ReaderPageSourceWithRowPositions createDataPageSource( inputFile, start, length, - fileRecordCount, partitionSpecId, partitionData, fileSchema, @@ -540,7 +530,6 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource( TrinoInputFile inputFile, long start, long length, - long fileRecordCount, int partitionSpecId, String partitionData, List columns, @@ -612,9 +601,6 @@ else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { else if (column.isRowPositionColumn()) { columnAdaptations.add(ColumnAdaptation.positionColumn()); } - else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { - columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), fileRecordCount))); - } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId))); } @@ -882,7 +868,6 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( TrinoInputFile inputFile, long start, long length, - long fileRecordCount, int partitionSpecId, String partitionData, List regularColumns, @@ -973,9 +958,6 @@ else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { else if (column.isRowPositionColumn()) { pageSourceBuilder.addRowIndexColumn(); } - else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { - pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); - } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); } @@ -1085,7 +1067,6 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource( TrinoInputFile inputFile, long start, long length, - long fileRecordCount, int partitionSpecId, String partitionData, Schema fileSchema, @@ -1146,9 +1127,6 @@ else if (column.isRowPositionColumn()) { constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel); avroSourceChannel++; } - else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); - } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index ebd1e1fc5622..20719a9d8dfb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -40,7 +40,6 @@ public class IcebergSplit private final long start; private final long length; private final long fileSize; - private final long fileRecordCount; private final IcebergFileFormat fileFormat; private final String partitionSpecJson; private final String partitionDataJson; @@ -53,7 +52,6 @@ public IcebergSplit( @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, - @JsonProperty("fileRecordCount") long fileRecordCount, @JsonProperty("fileFormat") IcebergFileFormat fileFormat, @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, @@ -64,7 +62,6 @@ public IcebergSplit( this.start = start; this.length = length; this.fileSize = fileSize; - this.fileRecordCount = fileRecordCount; this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); @@ -109,12 +106,6 @@ public long getFileSize() return fileSize; } - @JsonProperty - public long getFileRecordCount() - { - return fileRecordCount; - } - @JsonProperty public IcebergFileFormat getFileFormat() { @@ -173,8 +164,7 @@ public String toString() ToStringHelper helper = toStringHelper(this) .addValue(path) .add("start", start) - .add("length", length) - .add("records", fileRecordCount); + .add("length", length); if (!deletes.isEmpty()) { helper.add("deleteFiles", deletes.size()); helper.add("deleteRecords", deletes.stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 0efebcf1a471..6e4ad08f6eea 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -463,7 +463,6 @@ private IcebergSplit toIcebergSplit(FileScanTask task) task.start(), task.length(), task.file().fileSizeInBytes(), - task.file().recordCount(), IcebergFileFormat.fromIceberg(task.file().format()), PartitionSpecParser.toJson(task.spec()), PartitionData.toJson(task.file().partition()), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java index b8fa505b286b..0eb7b80318f5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/IcebergPositionDeletePageSink.java @@ -58,11 +58,9 @@ public class IcebergPositionDeletePageSink private final JsonCodec jsonCodec; private final IcebergFileWriter writer; private final IcebergFileFormat fileFormat; - private final long fileRecordCount; private long validationCpuNanos; private boolean writtenData; - private long deletedRowCount; public IcebergPositionDeletePageSink( String dataFilePath, @@ -74,15 +72,13 @@ public IcebergPositionDeletePageSink( JsonCodec jsonCodec, ConnectorSession session, IcebergFileFormat fileFormat, - Map storageProperties, - long fileRecordCount) + Map storageProperties) { this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null"); this.partition = requireNonNull(partition, "partition is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.fileRecordCount = fileRecordCount; // prepend query id to a file name so we can determine which files were written by which query. This is needed for opportunistic cleanup of extra files // which may be present for successfully completing query in presence of failure recovery mechanisms. String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID()); @@ -121,7 +117,6 @@ public CompletableFuture appendPage(Page page) writer.appendRows(new Page(blocks)); writtenData = true; - deletedRowCount += page.getPositionCount(); return NOT_BLOCKED; } @@ -139,9 +134,7 @@ public CompletableFuture> finish() PartitionSpecParser.toJson(partitionSpec), partition.map(PartitionData::toJson), FileContent.POSITION_DELETES, - Optional.of(dataFilePath), - Optional.of(fileRecordCount), - Optional.of(deletedRowCount)); + Optional.of(dataFilePath)); Long recordCount = task.getMetrics().recordCount(); if (recordCount != null && recordCount > 0) { commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index dd3f67d772cb..e2d3fec64e1b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -14,7 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; -import io.airlift.concurrent.MoreFutures; +import com.google.common.collect.Streams; import io.trino.Session; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; @@ -31,12 +31,15 @@ import org.testng.annotations.Test; import java.util.List; +import java.util.Optional; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.IntStream; +import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.concurrent.MoreFutures.tryGetFutureValue; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_TABLE; @@ -47,6 +50,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -124,28 +128,38 @@ public void testDeleteRowsConcurrently() int threads = 4; CyclicBarrier barrier = new CyclicBarrier(threads); ExecutorService executor = newFixedThreadPool(threads); + List rows = ImmutableList.of("(1, 0, 0, 0)", "(0, 1, 0, 0)", "(0, 0, 1, 0)", "(0, 0, 0, 1)"); + + String[] expectedErrors = new String[]{"Failed to commit Iceberg update to table:", "Failed to replace table due to concurrent updates:"}; try (TestTable table = new TestTable( getQueryRunner()::execute, "test_concurrent_delete", "(col0 INTEGER, col1 INTEGER, col2 INTEGER, col3 INTEGER)")) { String tableName = table.getName(); - assertUpdate("INSERT INTO " + tableName + " VALUES (0, 0, 0, 0)", 1); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 0, 0, 0)", 1); - assertUpdate("INSERT INTO " + tableName + " VALUES (0, 1, 0, 0)", 1); - assertUpdate("INSERT INTO " + tableName + " VALUES (0, 0, 1, 0)", 1); - assertUpdate("INSERT INTO " + tableName + " VALUES (0, 0, 0, 1)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES " + String.join(", ", rows), 4); - List> futures = IntStream.range(0, threads) + List> futures = IntStream.range(0, threads) .mapToObj(threadNumber -> executor.submit(() -> { barrier.await(10, SECONDS); String columnName = "col" + threadNumber; - getQueryRunner().execute(format("DELETE FROM %s WHERE %s = 1", tableName, columnName)); - return (Void) null; + try { + getQueryRunner().execute(format("DELETE FROM %s WHERE %s = 1", tableName, columnName)); + return true; + } + catch (Exception e) { + assertThat(e.getMessage()).containsAnyOf(expectedErrors); + return false; + } })) .collect(toImmutableList()); - futures.forEach(MoreFutures::getFutureValue); - assertThat(query("SELECT max(col0), max(col1), max(col2), max(col3) FROM " + tableName)).matches("VALUES (0, 0, 0, 0)"); + Stream> expectedRows = Streams.mapWithIndex(futures.stream(), (future, index) -> { + boolean deleteSuccessful = tryGetFutureValue(future, 10, SECONDS).orElseThrow(); + return deleteSuccessful ? Optional.empty() : Optional.of(rows.get((int) index)); + }); + String expectedValues = expectedRows.filter(Optional::isPresent).map(Optional::get).collect(joining(", ")); + assertThat(expectedValues).isNotEmpty().as("Expected at least one delete operation to pass"); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES " + expectedValues); } finally { executor.shutdownNow(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 3a4c87bac214..f5b58a0b2048 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1378,6 +1378,7 @@ public void testUpdateWithSortOrder() "test_sorted_update", "WITH (sorted_by = ARRAY['comment']) AS TABLE tpch.tiny.lineitem WITH NO DATA")) { assertUpdate( + withSmallRowGroups, "INSERT INTO " + table.getName() + " TABLE tpch.tiny.lineitem", "VALUES 60175"); assertUpdate(withSmallRowGroups, "UPDATE " + table.getName() + " SET comment = substring(comment, 2)", 60175); @@ -1991,6 +1992,7 @@ public void testPartitionPredicatePushdownWithHistoricalPartitionSpecs() // The old partition scheme is no longer used so pushdown using the hour transform is allowed assertUpdate("DELETE FROM " + tableName + " WHERE year(d) = 1969", 3); + assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize"); assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); assertThat(query(selectQuery)) .containsAll("VALUES 1, 8, 9, 10") diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index abb53731c964..019732d1af00 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -160,7 +160,6 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle 0, inputFile.length(), inputFile.length(), - 0, // This is incorrect, but the value is only used for delete operations ORC, PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), PartitionData.toJson(new PartitionData(new Object[] {})), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergStatistics.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergStatistics.java index e6ebe9c6fe08..30bb2574bb8b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergStatistics.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergStatistics.java @@ -154,9 +154,9 @@ public void testAnalyzeWithSchemaEvolution() VALUES ('nationkey', null, 25, 0, null, '0', '24'), ('regionkey', null, 5, 0, null, '0', '4'), - ('name', 1314.0, 25, 0, null, null, null), - ('info', 4417.0, null, 0, null, null, null), - (null, null, null, null, 25, null, null)"""); + ('name', 1908.0, 25, 0, null, null, null), + ('info', null, null, null, null, null, null), + (null, null, null, null, 50, null, null)"""); assertUpdate("ANALYZE " + tableName); assertQuery( @@ -165,9 +165,9 @@ public void testAnalyzeWithSchemaEvolution() VALUES ('nationkey', null, 25, 0, null, '0', '24'), ('regionkey', null, 5, 0, null, '0', '4'), - ('name', 1314.0, 25, 0, null, null, null), - ('info', 4417.0, 25, 0, null, null, null), - (null, null, null, null, 25, null, null)"""); + ('name', 1908.0, 25, 0, null, null, null), + ('info', 4417.0, 25, 0.1, null, null, null), + (null, null, null, null, 50, null, null)"""); // Row count statistics do not yet account for position deletes assertUpdate("DROP TABLE " + tableName); } @@ -999,11 +999,11 @@ public void testStatsAfterDeletingAllRows() .projected("column_name", "distinct_values_count", "row_count") .skippingTypesCheck() .containsAll("VALUES " + - "('nationkey', DOUBLE '0.0', null), " + - "('name', DOUBLE '0.0', null), " + - "('regionkey', DOUBLE '0.0', null), " + - "('comment', DOUBLE '0.0', null), " + - "(null, null, DOUBLE '0.0')"); + "('nationkey', DOUBLE '25', null), " + + "('name', DOUBLE '25', null), " + + "('regionkey', DOUBLE '5', null), " + + "('comment', DOUBLE '25', null), " + + "(null, null, DOUBLE '25')"); } private long getCurrentSnapshotId(String tableName) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index dda302d94b7b..44d840408ace 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -18,6 +18,8 @@ import io.trino.Session; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; +import io.trino.plugin.base.util.Closables; +import io.trino.plugin.blackhole.BlackHolePlugin; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; @@ -35,6 +37,7 @@ import io.trino.spi.type.TestingTypeManager; import io.trino.spi.type.TypeManager; import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import io.trino.testing.sql.TestTable; import org.apache.hadoop.fs.Path; @@ -67,11 +70,18 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -87,6 +97,8 @@ import static java.lang.String.format; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.iceberg.FileFormat.ORC; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.assertj.core.api.Assertions.assertThat; @@ -110,10 +122,21 @@ protected QueryRunner createQueryRunner() metastoreDir = tempDir.resolve("iceberg_data").toFile(); metastore = createTestingFileHiveMetastore(metastoreDir); - return IcebergQueryRunner.builder() + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() .setInitialTables(NATION) .setMetastoreDirectory(metastoreDir) .build(); + + try { + queryRunner.installPlugin(new BlackHolePlugin()); + queryRunner.createCatalog("blackhole", "blackhole"); + } + catch (RuntimeException e) { + Closables.closeAllSuppress(e, queryRunner); + throw e; + } + + return queryRunner; } @BeforeClass @@ -339,6 +362,98 @@ public void testOptimizingPartitionsOfV2TableWithGlobalEqualityDeleteFile() .toArray(String[]::new)); } + @Test + public void testOptimizeDuringWriteOperations() + throws Exception + { + runOptimizeDuringWriteOperations(true); + runOptimizeDuringWriteOperations(false); + } + + private void runOptimizeDuringWriteOperations(boolean useSmallFiles) + throws Exception + { + int threads = 5; + int deletionThreads = threads - 1; + int rows = 20; + int rowsPerThread = rows / deletionThreads; + + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + + // Slow down the delete operations so optimize is more likely to complete + String blackholeTable = "blackhole_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE blackhole.default.%s (a INT, b INT) WITH (split_count = 1, pages_per_split = 1, rows_per_page = 1, page_processing_delay = '1s')".formatted(blackholeTable)); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_optimize_during_write_operations", + "(int_col INT)")) { + String tableName = table.getName(); + + // Testing both situations where a file is fully removed by the delete operation and when a row level delete is required. + if (useSmallFiles) { + for (int i = 0; i < rows; i++) { + assertUpdate(format("INSERT INTO %s VALUES %s", tableName, i), 1); + } + } + else { + String values = IntStream.range(0, rows).mapToObj(String::valueOf).collect(Collectors.joining(", ")); + assertUpdate(format("INSERT INTO %s VALUES %s", tableName, values), rows); + } + + List>> deletionFutures = IntStream.range(0, deletionThreads) + .mapToObj(threadNumber -> executor.submit(() -> { + barrier.await(10, SECONDS); + List successfulDeletes = new ArrayList<>(); + for (int i = 0; i < rowsPerThread; i++) { + try { + int rowNumber = threadNumber * rowsPerThread + i; + getQueryRunner().execute(format("DELETE FROM %s WHERE int_col = %s OR ((SELECT count(*) FROM blackhole.default.%s) > 42)", tableName, rowNumber, blackholeTable)); + successfulDeletes.add(true); + } + catch (RuntimeException e) { + successfulDeletes.add(false); + } + } + return successfulDeletes; + })) + .collect(toImmutableList()); + + Future optimizeFuture = executor.submit(() -> { + try { + barrier.await(10, SECONDS); + // Allow for some deletes to start before running optimize + Thread.sleep(50); + assertUpdate("ALTER TABLE %s EXECUTE optimize".formatted(tableName)); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }); + + List expectedValues = new ArrayList<>(); + for (int threadNumber = 0; threadNumber < deletionThreads; threadNumber++) { + List deleteOutcomes = deletionFutures.get(threadNumber).get(); + verify(deleteOutcomes.size() == rowsPerThread); + for (int rowNumber = 0; rowNumber < rowsPerThread; rowNumber++) { + boolean successfulDelete = deleteOutcomes.get(rowNumber); + if (!successfulDelete) { + expectedValues.add(String.valueOf(threadNumber * rowsPerThread + rowNumber)); + } + } + } + + optimizeFuture.get(); + assertThat(expectedValues.size()).isGreaterThan(0).isLessThan(rows); + assertQuery("SELECT * FROM " + tableName, "VALUES " + String.join(", ", expectedValues)); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + @Test public void testUpgradeTableToV2FromTrino() { @@ -433,7 +548,7 @@ public void testDeletingEntireFile() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey <= 2", "SELECT count(*) FROM nation WHERE regionkey <= 2"); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey > 2"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); } @Test @@ -447,7 +562,7 @@ public void testDeletingEntireFileFromPartitionedTable() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(4); assertUpdate("DELETE FROM " + tableName + " WHERE b % 2 = 0", 6); assertQuery("SELECT * FROM " + tableName, "VALUES (1, 1), (1, 3), (1, 5), (2, 1), (2, 3), (2, 5)"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(4); } @Test @@ -461,7 +576,7 @@ public void testDeletingEntireFileWithNonTupleDomainConstraint() assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey % 2 = 1", "SELECT count(*) FROM nation WHERE regionkey % 2 = 1"); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey % 2 = 0"); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(2); } @Test @@ -481,7 +596,7 @@ public void testDeletingEntireFileWithMultipleSplits() long parentSnapshotId = (long) computeScalar("SELECT parent_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES"); assertEquals(initialSnapshotId, parentSnapshotId); assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult(); - assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(0); + assertThat(this.loadTable(tableName).newScan().planFiles()).hasSize(1); } @Test