Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Dec 5, 2024
1 parent eb29dda commit 6c315fa
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DeltaLakeConfig
private long defaultCheckpointWritingInterval = 10;
private boolean checkpointFilteringEnabled = true;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private boolean vacuumLoggingEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
Expand Down Expand Up @@ -280,6 +281,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention)
return this;
}

public boolean isVacuumLoggingEnabled()
{
return vacuumLoggingEnabled;
}

@Config("delta.vacuum.logging.enabled")
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled)
{
this.vacuumLoggingEnabled = vacuumLoggingEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -24,18 +25,23 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.base.util.UncheckedCloseable;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeMetadata;
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -68,6 +74,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
Expand Down Expand Up @@ -99,18 +106,29 @@ public class VacuumProcedure
private final TrinoFileSystemFactory fileSystemFactory;
private final DeltaLakeMetadataFactory metadataFactory;
private final TransactionLogAccess transactionLogAccess;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private boolean vacuumLoggingEnabled;
private final String nodeVersion;
private final String nodeId;

@Inject
public VacuumProcedure(
CatalogName catalogName,
TrinoFileSystemFactory fileSystemFactory,
DeltaLakeMetadataFactory metadataFactory,
TransactionLogAccess transactionLogAccess)
TransactionLogAccess transactionLogAccess,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
DeltaLakeConfig deltaLakeConfig)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
this.vacuumLoggingEnabled = deltaLakeConfig.isVacuumLoggingEnabled();
}

@Override
Expand Down Expand Up @@ -255,9 +273,9 @@ private void doVacuum(
long transactionLogFiles = 0;
long retainedKnownFiles = 0;
long retainedUnknownFiles = 0;
long removedFiles = 0;
List<TrinoInputFile> filesToDelete = new ArrayList<>();
long filesToDeleteSize = 0;

List<Location> filesToDelete = new ArrayList<>();
FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
while (listing.hasNext()) {
FileEntry entry = listing.next();
Expand Down Expand Up @@ -295,21 +313,32 @@ private void doVacuum(
retainedUnknownFiles++;
continue;
}

log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime);
filesToDelete.add(entry.location());
if (filesToDelete.size() == DELETE_BATCH_SIZE) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
filesToDelete.clear();
}
}

if (!filesToDelete.isEmpty()) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
Location fileLocation = Location.of(location);
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
filesToDelete.add(inputFile);
filesToDeleteSize += inputFile.length();
}
long readVersion = handle.getReadVersion();
logVacuumStart(handle.location(), session, readVersion, filesToDelete.size(), filesToDeleteSize);
int totalFilesToDelete = filesToDelete.size();
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
String status = "FAILED";
try {
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);

List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
}
status = "COMPLETED";
}
finally {
int numVacuumedDirectories = Set.of(filesToDelete.stream().map(f -> f.location().parentDirectory())).size();
logVacuumEnd(handle.location(), session, readVersion, filesToDelete.size(), numVacuumedDirectories, status);
}
log.info(
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
queryId,
Expand All @@ -319,7 +348,56 @@ private void doVacuum(
transactionLogFiles,
retainedKnownFiles,
retainedUnknownFiles,
removedFiles);
totalFilesToDelete);
}
}

private void logVacuumStart(String location, ConnectorSession session, long readVersion, long numFilesToDelete, long filesToDeleteSize)
throws IOException
{
if (!vacuumLoggingEnabled) {
return;
}

long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 1;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of(), ImmutableMap.of("queryId", session.getQueryId(), "numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion));

transactionLogWriter.flush();
}

private void logVacuumEnd(String location, ConnectorSession session, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
throws IOException
{
if (!vacuumLoggingEnabled) {
return;
}

long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 2;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("status", status), ImmutableMap.of("queryId", session.getQueryId(), "numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), readVersion));
transactionLogWriter.flush();
}

private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap<String, String> operationParameters, ImmutableMap<String, String> operationMetrics, long readVersion)
{
return new CommitInfoEntry(
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
operation,
operationParameters,
null,
null,
"trino-" + nodeVersion + "-" + nodeId,
readVersion,
IsolationLevel.WRITESERIALIZABLE.getValue(),
Optional.of(true),
operationMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner()
.put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("delta.register-table-procedure.enabled", "true")
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("delta.vacuum.logging.enabled", "true")
.putAll(deltaStorageConfiguration())
.buildOrThrow())
.setSchemaLocation(getLocationForTable(bucketName, SCHEMA))
Expand Down Expand Up @@ -1746,6 +1747,15 @@ public void testVacuum()
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
// old files should be cleaned up
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles);
// operations should be logged
assertQuery("SELECT version, operation, CAST(MAP_FILTER(operation_parameters, (k, v) -> k <> 'queryId') AS JSON), CAST(MAP_FILTER(operation_metrics, (k, v) -> k <> 'queryId') AS JSON) FROM \"" + tableName + "$history\"", """
VALUES\s
(0, 'CREATE TABLE AS SELECT', '{}', '{}'),\s
(1, 'MERGE', '{}', '{}'),\s
(2, 'VACUUM START', '{}', '{"sizeOfDataToDelete":"0","numFilesToDelete":"0"}'),\s
(3, 'VACUUM END', '{"status":"COMPLETED"}', '{"numDeletedFiles":"0","numVacuumedDirectories":"1"}'),\s
(4, 'VACUUM START', '{}', '{"sizeOfDataToDelete":"5025","numFilesToDelete":"5"}'),\s
(5, 'VACUUM END', '{"status":"COMPLETED"}', '{"numDeletedFiles":"5","numVacuumedDirectories":"1"}')""");
}
finally {
assertUpdate("DROP TABLE " + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testDefaults()
.setQueryPartitionFilterRequired(false)
.setDeletionVectorsEnabled(false)
.setDeltaLogFileSystemCacheDisabled(false)
.setMetadataParallelism(8));
.setMetadataParallelism(8)
.setVacuumLoggingEnabled(false));
}

@Test
Expand Down Expand Up @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings()
.put("delta.deletion-vectors-enabled", "true")
.put("delta.fs.cache.disable-transaction-log-caching", "true")
.put("delta.metadata.parallelism", "10")
.put("delta.vacuum.logging.enabled", "true")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings()
.setQueryPartitionFilterRequired(true)
.setDeletionVectorsEnabled(true)
.setDeltaLogFileSystemCacheDisabled(true)
.setMetadataParallelism(10);
.setMetadataParallelism(10)
.setVacuumLoggingEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimeWithTimeZoneType;
import io.trino.spi.type.TimestampType;
Expand All @@ -51,7 +52,9 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down

0 comments on commit 6c315fa

Please sign in to comment.