Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Dec 6, 2024
1 parent c69387c commit 0806741
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DeltaLakeConfig
private long defaultCheckpointWritingInterval = 10;
private boolean checkpointFilteringEnabled = true;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private boolean vacuumLoggingEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
Expand Down Expand Up @@ -280,6 +281,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention)
return this;
}

public boolean isVacuumLoggingEnabled()
{
return vacuumLoggingEnabled;
}

@Config("delta.vacuum.logging.enabled")
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled)
{
this.vacuumLoggingEnabled = vacuumLoggingEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties
{
public static final String MAX_SPLIT_SIZE = "max_split_size";
public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention";
public static final String VACUUM_LOGGING_ENABLED = "vacuum_logging_enabled";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
Expand Down Expand Up @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties(
"Minimal retention period for vacuum procedure",
deltaLakeConfig.getVacuumMinRetention(),
false),
booleanProperty(
VACUUM_LOGGING_ENABLED,
"Vacuum logging enabled",
deltaLakeConfig.isVacuumLoggingEnabled(),
false),
stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
Expand Down Expand Up @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session)
return session.getProperty(VACUUM_MIN_RETENTION, Duration.class);
}

public static boolean isVacuumLoggingEnabled(ConnectorSession session)
{
return session.getProperty(VACUUM_LOGGING_ENABLED, Boolean.class);
}

public static Optional<String> getHiveCatalogName(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -32,11 +33,15 @@
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -68,7 +73,9 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
Expand Down Expand Up @@ -100,18 +107,26 @@ public class VacuumProcedure
private final TrinoFileSystemFactory fileSystemFactory;
private final DeltaLakeMetadataFactory metadataFactory;
private final TransactionLogAccess transactionLogAccess;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final String nodeVersion;
private final String nodeId;

@Inject
public VacuumProcedure(
CatalogName catalogName,
TrinoFileSystemFactory fileSystemFactory,
DeltaLakeMetadataFactory metadataFactory,
TransactionLogAccess transactionLogAccess)
TransactionLogAccess transactionLogAccess,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
}

@Override
Expand Down Expand Up @@ -166,6 +181,7 @@ private void doVacuum(

Duration retentionDuration = Duration.valueOf(retention);
Duration minRetention = getVacuumMinRetention(session);
boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session);
checkProcedureArgument(
retentionDuration.compareTo(minRetention) >= 0,
"Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " +
Expand Down Expand Up @@ -257,6 +273,8 @@ private void doVacuum(
long retainedKnownFiles = 0;
long retainedUnknownFiles = 0;
List<TrinoInputFile> filesToDelete = new ArrayList<>();
long filesToDeleteSize = 0;
int numActualFilesDeleted = 0;

FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
while (listing.hasNext()) {
Expand Down Expand Up @@ -300,17 +318,32 @@ private void doVacuum(
Location fileLocation = Location.of(location);
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
filesToDelete.add(inputFile);
filesToDeleteSize += inputFile.length();
}
long readVersion = handle.getReadVersion();
if (isVacuumLoggingEnabled) {
logVacuumStart(handle.location(), session, readVersion, filesToDelete.size(), filesToDeleteSize);
}
int totalFilesToDelete = filesToDelete.size();
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);
String status = "FAILED";
try {
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);

List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
List<TrinoInputFile> batch = filesToDelete.subList(start, end);
numActualFilesDeleted += batch.size();
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
}
status = "COMPLETED";
}
finally {
if (isVacuumLoggingEnabled) {
int numVacuumedDirectories = Set.of(filesToDelete.stream().map(f -> f.location().parentDirectory())).size();
logVacuumEnd(handle.location(), session, readVersion, numActualFilesDeleted, numVacuumedDirectories, status);
}
}

log.info(
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
queryId,
Expand All @@ -323,4 +356,45 @@ private void doVacuum(
totalFilesToDelete);
}
}

private void logVacuumStart(String location, ConnectorSession session, long readVersion, long numFilesToDelete, long filesToDeleteSize)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 1;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of("queryId", session.getQueryId()), ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion));

transactionLogWriter.flush();
}

private void logVacuumEnd(String location, ConnectorSession session, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 2;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("queryId", session.getQueryId(), "status", status), ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), readVersion));
transactionLogWriter.flush();
}

private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap<String, String> operationParameters, ImmutableMap<String, String> operationMetrics, long readVersion)
{
return new CommitInfoEntry(
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
operation,
operationParameters,
null,
null,
"trino-" + nodeVersion + "-" + nodeId,
readVersion,
IsolationLevel.WRITESERIALIZABLE.getValue(),
Optional.of(true),
operationMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner()
.put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("delta.register-table-procedure.enabled", "true")
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("delta.vacuum.logging.enabled", "true")
.putAll(deltaStorageConfiguration())
.buildOrThrow())
.setSchemaLocation(getLocationForTable(bucketName, SCHEMA))
Expand Down Expand Up @@ -1716,6 +1717,9 @@ public void testVacuum()
Session sessionWithShortRetentionUnlocked = Session.builder(getSession())
.setCatalogSessionProperty(catalog, "vacuum_min_retention", "0s")
.build();
Session sessionWithVacuumLoggingDisabled = Session.builder(sessionWithShortRetentionUnlocked)
.setCatalogSessionProperty(catalog, "vacuum_logging_enabled", "false")
.build();
assertUpdate(
format("CREATE TABLE %s WITH (location = '%s', partitioned_by = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", tableName, tableLocation),
25);
Expand Down Expand Up @@ -1746,6 +1750,26 @@ public void testVacuum()
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
// old files should be cleaned up
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles);
// operations should be logged
assertThat(query("SELECT version, operation, MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId'), operation_metrics FROM \"" + tableName + "$history\"")).matches( """
VALUES\s
(CAST(0 AS BIGINT), CAST('CREATE TABLE AS SELECT' AS VARCHAR), CAST(MAP() AS MAP(VARCHAR, VARCHAR)), CAST(MAP() AS MAP(VARCHAR, VARCHAR))),\s
(1, 'MERGE', MAP(), MAP()),\s
(2, 'VACUUM START', MAP(), MAP(ARRAY['sizeOfDataToDelete', 'numFilesToDelete'], ARRAY['0', '0'])),\s
(3, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']), MAP(ARRAY['numDeletedFiles', 'numVacuumedDirectories'], ARRAY['0', '1'])),\s
(4, 'VACUUM START', MAP(), MAP(ARRAY['sizeOfDataToDelete', 'numFilesToDelete'], ARRAY['5025', '5'])),\s
(5, 'VACUUM END', MAP(ARRAY['status'], ARRAY['COMPLETED']), MAP(ARRAY['numDeletedFiles', 'numVacuumedDirectories'], ARRAY['5', '1']))""");

assertUpdate(sessionWithVacuumLoggingDisabled, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "', retention => '1s')");
// no new vacuum logging operations are logged
assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", """
VALUES\s
(0, 'CREATE TABLE AS SELECT'),\s
(1, 'MERGE'),\s
(2, 'VACUUM START'),\s
(3, 'VACUUM END'),\s
(4, 'VACUUM START'),\s
(5, 'VACUUM END')""");
}
finally {
assertUpdate("DROP TABLE " + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testDefaults()
.setQueryPartitionFilterRequired(false)
.setDeletionVectorsEnabled(false)
.setDeltaLogFileSystemCacheDisabled(false)
.setMetadataParallelism(8));
.setMetadataParallelism(8)
.setVacuumLoggingEnabled(false));
}

@Test
Expand Down Expand Up @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings()
.put("delta.deletion-vectors-enabled", "true")
.put("delta.fs.cache.disable-transaction-log-caching", "true")
.put("delta.metadata.parallelism", "10")
.put("delta.vacuum.logging.enabled", "true")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings()
.setQueryPartitionFilterRequired(true)
.setDeletionVectorsEnabled(true)
.setDeltaLogFileSystemCacheDisabled(true)
.setMetadataParallelism(10);
.setMetadataParallelism(10)
.setVacuumLoggingEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 0806741

Please sign in to comment.