From 0806741c6d1de1f9ed1d14f6ceef31019cc29d2f Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Mon, 2 Dec 2024 16:20:14 +0800 Subject: [PATCH] Register VACUUM operations in the delta log --- .../plugin/deltalake/DeltaLakeConfig.java | 14 +++ .../deltalake/DeltaLakeSessionProperties.java | 11 +++ .../deltalake/procedure/VacuumProcedure.java | 88 +++++++++++++++++-- .../BaseDeltaLakeConnectorSmokeTest.java | 24 +++++ .../plugin/deltalake/TestDeltaLakeConfig.java | 7 +- 5 files changed, 135 insertions(+), 9 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 2438414cb124..d8ea1f41c9c4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -70,6 +70,7 @@ public class DeltaLakeConfig private long defaultCheckpointWritingInterval = 10; private boolean checkpointFilteringEnabled = true; private Duration vacuumMinRetention = new Duration(7, DAYS); + private boolean vacuumLoggingEnabled; private Optional hiveCatalogName = Optional.empty(); private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; @@ -280,6 +281,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention) return this; } + public boolean isVacuumLoggingEnabled() + { + return vacuumLoggingEnabled; + } + + @Config("delta.vacuum.logging.enabled") + @ConfigDescription("Whether to log vacuum information into the Delta transaction log") + public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled) + { + this.vacuumLoggingEnabled = vacuumLoggingEnabled; + return this; + } + public Optional getHiveCatalogName() { return hiveCatalogName; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 01e744dc3047..dbb7fa8902d5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties { public static final String MAX_SPLIT_SIZE = "max_split_size"; public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention"; + public static final String VACUUM_LOGGING_ENABLED = "vacuum_logging_enabled"; private static final String HIVE_CATALOG_NAME = "hive_catalog_name"; private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size"; private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count"; @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties( "Minimal retention period for vacuum procedure", deltaLakeConfig.getVacuumMinRetention(), false), + booleanProperty( + VACUUM_LOGGING_ENABLED, + "Vacuum logging enabled", + deltaLakeConfig.isVacuumLoggingEnabled(), + false), stringProperty( HIVE_CATALOG_NAME, "Catalog to redirect to when a Hive table is referenced", @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session) return session.getProperty(VACUUM_MIN_RETENTION, Duration.class); } + public static boolean isVacuumLoggingEnabled(ConnectorSession session) + { + return session.getProperty(VACUUM_LOGGING_ENABLED, Boolean.class); + } + public static Optional getHiveCatalogName(ConnectorSession session) { return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index fe6b6b5eea79..7bb8cc64efe1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.procedure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.Provider; @@ -32,11 +33,15 @@ import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; +import io.trino.spi.NodeManager; import io.trino.spi.TrinoException; import io.trino.spi.catalog.CatalogName; import io.trino.spi.classloader.ThreadContextClassLoader; @@ -68,7 +73,9 @@ import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; @@ -100,18 +107,26 @@ public class VacuumProcedure private final TrinoFileSystemFactory fileSystemFactory; private final DeltaLakeMetadataFactory metadataFactory; private final TransactionLogAccess transactionLogAccess; + private final TransactionLogWriterFactory transactionLogWriterFactory; + private final String nodeVersion; + private final String nodeId; @Inject public VacuumProcedure( CatalogName catalogName, TrinoFileSystemFactory fileSystemFactory, DeltaLakeMetadataFactory metadataFactory, - TransactionLogAccess transactionLogAccess) + TransactionLogAccess transactionLogAccess, + TransactionLogWriterFactory transactionLogWriterFactory, + NodeManager nodeManager) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); + this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null"); + this.nodeVersion = nodeManager.getCurrentNode().getVersion(); + this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier(); } @Override @@ -166,6 +181,7 @@ private void doVacuum( Duration retentionDuration = Duration.valueOf(retention); Duration minRetention = getVacuumMinRetention(session); + boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session); checkProcedureArgument( retentionDuration.compareTo(minRetention) >= 0, "Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " + @@ -257,6 +273,8 @@ private void doVacuum( long retainedKnownFiles = 0; long retainedUnknownFiles = 0; List filesToDelete = new ArrayList<>(); + long filesToDeleteSize = 0; + int numActualFilesDeleted = 0; FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); while (listing.hasNext()) { @@ -300,17 +318,32 @@ private void doVacuum( Location fileLocation = Location.of(location); TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation); filesToDelete.add(inputFile); + filesToDeleteSize += inputFile.length(); + } + long readVersion = handle.getReadVersion(); + if (isVacuumLoggingEnabled) { + logVacuumStart(handle.location(), session, readVersion, filesToDelete.size(), filesToDeleteSize); } int totalFilesToDelete = filesToDelete.size(); int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE); - for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { - int start = batchNumber * DELETE_BATCH_SIZE; - int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); + String status = "FAILED"; + try { + for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) { + int start = batchNumber * DELETE_BATCH_SIZE; + int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete); - List batch = filesToDelete.subList(start, end); - fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + List batch = filesToDelete.subList(start, end); + numActualFilesDeleted += batch.size(); + fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList())); + } + status = "COMPLETED"; + } + finally { + if (isVacuumLoggingEnabled) { + int numVacuumedDirectories = Set.of(filesToDelete.stream().map(f -> f.location().parentDirectory())).size(); + logVacuumEnd(handle.location(), session, readVersion, numActualFilesDeleted, numVacuumedDirectories, status); + } } - log.info( "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", queryId, @@ -323,4 +356,45 @@ private void doVacuum( totalFilesToDelete); } } + + private void logVacuumStart(String location, ConnectorSession session, long readVersion, long numFilesToDelete, long filesToDeleteSize) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 1; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of("queryId", session.getQueryId()), ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion)); + + transactionLogWriter.flush(); + } + + private void logVacuumEnd(String location, ConnectorSession session, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status) + throws IOException + { + long createdTime = System.currentTimeMillis(); + long commitVersion = readVersion + 2; + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location); + transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("queryId", session.getQueryId(), "status", status), ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), readVersion)); + transactionLogWriter.flush(); + } + + private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap operationParameters, ImmutableMap operationMetrics, long readVersion) + { + return new CommitInfoEntry( + commitVersion, + createdTime, + session.getUser(), + session.getUser(), + operation, + operationParameters, + null, + null, + "trino-" + nodeVersion + "-" + nodeId, + readVersion, + IsolationLevel.WRITESERIALIZABLE.getValue(), + Optional.of(true), + operationMetrics); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index b143ce622d3a..45bb9258c4df 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner() .put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s") .put("delta.register-table-procedure.enabled", "true") .put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("delta.vacuum.logging.enabled", "true") .putAll(deltaStorageConfiguration()) .buildOrThrow()) .setSchemaLocation(getLocationForTable(bucketName, SCHEMA)) @@ -1716,6 +1717,9 @@ public void testVacuum() Session sessionWithShortRetentionUnlocked = Session.builder(getSession()) .setCatalogSessionProperty(catalog, "vacuum_min_retention", "0s") .build(); + Session sessionWithVacuumLoggingDisabled = Session.builder(sessionWithShortRetentionUnlocked) + .setCatalogSessionProperty(catalog, "vacuum_logging_enabled", "false") + .build(); assertUpdate( format("CREATE TABLE %s WITH (location = '%s', partitioned_by = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", tableName, tableLocation), 25); @@ -1746,6 +1750,26 @@ public void testVacuum() assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles); // old files should be cleaned up assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles); + // operations should be logged + assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId'), operation_metrics FROM \"" + tableName + "$history\"")).matches( """ + VALUES\s + (CAST(0 AS BIGINT), CAST('CREATE TABLE AS SELECT' AS VARCHAR), CAST(MAP() AS MAP(VARCHAR, VARCHAR)), CAST(MAP() AS MAP(VARCHAR, VARCHAR))),\s + (1, 'MERGE', MAP(), MAP()),\s + (2, 'VACUUM START', MAP(), MAP(ARRAY['sizeOfDataToDelete', 'numFilesToDelete'], ARRAY['0', '0'])),\s + (3, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']), MAP(ARRAY['numDeletedFiles', 'numVacuumedDirectories'], ARRAY['0', '1'])),\s + (4, 'VACUUM START', MAP(), MAP(ARRAY['sizeOfDataToDelete', 'numFilesToDelete'], ARRAY['5025', '5'])),\s + (5, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']), MAP(ARRAY['numDeletedFiles', 'numVacuumedDirectories'], ARRAY['5', '1']))"""); + + assertUpdate(sessionWithVacuumLoggingDisabled, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '1s')"); + // no new vacuum logging operations are logged + assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", """ + VALUES\s + (0, 'CREATE TABLE AS SELECT'),\s + (1, 'MERGE'),\s + (2, 'VACUUM START'),\s + (3, 'VACUUM END'),\s + (4, 'VACUUM START'),\s + (5, 'VACUUM END')"""); } finally { assertUpdate("DROP TABLE " + tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 1158f3ceeb83..45a800428e86 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -75,7 +75,8 @@ public void testDefaults() .setQueryPartitionFilterRequired(false) .setDeletionVectorsEnabled(false) .setDeltaLogFileSystemCacheDisabled(false) - .setMetadataParallelism(8)); + .setMetadataParallelism(8) + .setVacuumLoggingEnabled(false)); } @Test @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings() .put("delta.deletion-vectors-enabled", "true") .put("delta.fs.cache.disable-transaction-log-caching", "true") .put("delta.metadata.parallelism", "10") + .put("delta.vacuum.logging.enabled", "true") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings() .setQueryPartitionFilterRequired(true) .setDeletionVectorsEnabled(true) .setDeltaLogFileSystemCacheDisabled(true) - .setMetadataParallelism(10); + .setMetadataParallelism(10) + .setVacuumLoggingEnabled(true); assertFullMapping(properties, expected); }