diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 2438414cb124..22492457dabe 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -91,6 +91,7 @@ public class DeltaLakeConfig private boolean deletionVectorsEnabled; private boolean deltaLogFileSystemCacheDisabled; private int metadataParallelism = 8; + private boolean vacuumLoggingEnabled; public Duration getMetadataCacheTtl() { @@ -581,4 +582,17 @@ public DeltaLakeConfig setMetadataParallelism(int metadataParallelism) this.metadataParallelism = metadataParallelism; return this; } + + public boolean isVacuumLoggingEnabled() + { + return vacuumLoggingEnabled; + } + + @Config("delta.vacuum.logging.enabled") + @ConfigDescription("Whether to log vacuum information into the Delta transaction log") + public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled) + { + this.vacuumLoggingEnabled = vacuumLoggingEnabled; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index 2ec61528d6cb..41489c6934cb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.procedure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.Provider; @@ -24,6 +25,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeMetadata; @@ -31,11 +33,15 @@ import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; +import io.trino.spi.NodeManager; import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.classloader.ThreadContextClassLoader; @@ -68,6 +74,7 @@ import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; @@ -99,18 +106,29 @@ public class VacuumProcedure private final TrinoFileSystemFactory fileSystemFactory; private final DeltaLakeMetadataFactory metadataFactory; private final TransactionLogAccess transactionLogAccess; + private final TransactionLogWriterFactory transactionLogWriterFactory; + private final DeltaLakeConfig deltaLakeConfig; + private final String nodeVersion; + private final String nodeId; @Inject public VacuumProcedure( CatalogName catalogName, TrinoFileSystemFactory fileSystemFactory, DeltaLakeMetadataFactory metadataFactory, - TransactionLogAccess transactionLogAccess) + TransactionLogAccess transactionLogAccess, + TransactionLogWriterFactory transactionLogWriterFactory, + NodeManager nodeManager, + DeltaLakeConfig deltaLakeConfig) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null"); + this.nodeVersion = nodeManager.getCurrentNode().getVersion(); + this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier(); + this.deltaLakeConfig = requireNonNull(deltaLakeConfig, "deltaLakeConfig is null"); } @Override @@ -255,9 +273,9 @@ private void doVacuum( long transactionLogFiles = 0; long retainedKnownFiles = 0; long retainedUnknownFiles = 0; - long removedFiles = 0; + List filesToDelete = new ArrayList<>(); + long filesToDeleteSize = 0; - List filesToDelete = new ArrayList<>(); FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); while (listing.hasNext()) { FileEntry entry = listing.next(); @@ -295,21 +313,32 @@ private void doVacuum( retainedUnknownFiles++; continue; } - log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime); - filesToDelete.add(entry.location()); - if (filesToDelete.size() == DELETE_BATCH_SIZE) { - fileSystem.deleteFiles(filesToDelete); - removedFiles += filesToDelete.size(); - filesToDelete.clear(); - } - } - if (!filesToDelete.isEmpty()) { - fileSystem.deleteFiles(filesToDelete); - removedFiles += filesToDelete.size(); + Location fileLocation = Location.of(location); + TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation); + filesToDelete.add(inputFile); + filesToDeleteSize += inputFile.length(); } + long readVersion = handle.getReadVersion(); + logVacuumStart(handle, session, readVersion, readVersion + 1, filesToDelete.size(), filesToDeleteSize); + int totalFilesToDelete = filesToDelete.size(); + int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE); + String status = "FAILED"; + try { + for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { + int start = batchNumber * DELETE_BATCH_SIZE; + int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); + List batch = filesToDelete.subList(start, end); + fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + } + status = "COMPLETED"; + } + finally { + int numVacuumedDirectories = Set.of(filesToDelete.stream().map(f -> f.location().parentDirectory())).size(); + logVacuumEnd(handle, session, readVersion, readVersion + 2, filesToDelete.size(), numVacuumedDirectories, status); + } log.info( "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", queryId, @@ -319,7 +348,60 @@ private void doVacuum( transactionLogFiles, retainedKnownFiles, retainedUnknownFiles, - removedFiles); + totalFilesToDelete); + } + } + + private void logVacuumStart(DeltaLakeTableHandle handle, ConnectorSession session, long readVersion, long commitVersion, long numFilesToDelete, long filesToDeleteSize) + throws IOException + { + if (!deltaLakeConfig.isVacuumLoggingEnabled()) { + return; } + + long createdTime = System.currentTimeMillis(); + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location()); + transactionLogWriter.appendCommitInfoEntry(new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + "VacuumStart", + ImmutableMap.of("queryId", session.getQueryId(), "numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + IsolationLevel.WRITESERIALIZABLE.getValue(), + Optional.of(false))); // TODO: blindAppend false OK? + + transactionLogWriter.flush(); + } + + private void logVacuumEnd(DeltaLakeTableHandle handle, ConnectorSession session, long readVersion, long commitVersion, int numDeletedFiles, int numVacuumedDirectories, String status) + throws IOException + { + if (!deltaLakeConfig.isVacuumLoggingEnabled()) { + return; + } + + long createdTime = System.currentTimeMillis(); + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location()); + transactionLogWriter.appendCommitInfoEntry(new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + "VacuumEnd", + ImmutableMap.of("queryId", session.getQueryId(), "numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories), "status", status), + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + IsolationLevel.WRITESERIALIZABLE.getValue(), + Optional.of(false))); // TODO: blindAppend false OK? + transactionLogWriter.flush(); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index b143ce622d3a..e47234905946 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -1746,6 +1746,9 @@ public void testVacuum() assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); // old files should be cleaned up assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); + // operations should be logged + assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT'), (1, 'MERGE'), (2, 'VacuumStart'), (3, 'VacuumEnd'), (4, 'VacuumStart'), (5, 'VacuumEnd')"); + } finally { assertUpdate("DROP TABLE " + tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 1158f3ceeb83..45a800428e86 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -75,7 +75,8 @@ public void testDefaults() .setQueryPartitionFilterRequired(false) .setDeletionVectorsEnabled(false) .setDeltaLogFileSystemCacheDisabled(false) - .setMetadataParallelism(8)); + .setMetadataParallelism(8) + .setVacuumLoggingEnabled(false)); } @Test @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings() .put("delta.deletion-vectors-enabled", "true") .put("delta.fs.cache.disable-transaction-log-caching", "true") .put("delta.metadata.parallelism", "10") + .put("delta.vacuum.logging.enabled", "true") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings() .setQueryPartitionFilterRequired(true) .setDeletionVectorsEnabled(true) .setDeltaLogFileSystemCacheDisabled(true) - .setMetadataParallelism(10); + .setMetadataParallelism(10) + .setVacuumLoggingEnabled(true); assertFullMapping(properties, expected); }