Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Dec 3, 2024
1 parent 261f254 commit 1ae966f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class DeltaLakeConfig
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheDisabled;
private int metadataParallelism = 8;
private boolean vacuumLoggingEnabled;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -581,4 +582,17 @@ public DeltaLakeConfig setMetadataParallelism(int metadataParallelism)
this.metadataParallelism = metadataParallelism;
return this;
}

public boolean isVacuumLoggingEnabled()
{
return vacuumLoggingEnabled;
}

@Config("delta.vacuum.logging.enabled")
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled)
{
this.vacuumLoggingEnabled = vacuumLoggingEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -24,18 +25,23 @@
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.base.util.UncheckedCloseable;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeMetadata;
import io.trino.plugin.deltalake.DeltaLakeMetadataFactory;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -68,6 +74,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
Expand Down Expand Up @@ -99,18 +106,29 @@ public class VacuumProcedure
private final TrinoFileSystemFactory fileSystemFactory;
private final DeltaLakeMetadataFactory metadataFactory;
private final TransactionLogAccess transactionLogAccess;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final DeltaLakeConfig deltaLakeConfig;
private final String nodeVersion;
private final String nodeId;

@Inject
public VacuumProcedure(
CatalogName catalogName,
TrinoFileSystemFactory fileSystemFactory,
DeltaLakeMetadataFactory metadataFactory,
TransactionLogAccess transactionLogAccess)
TransactionLogAccess transactionLogAccess,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
DeltaLakeConfig deltaLakeConfig)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
this.deltaLakeConfig = requireNonNull(deltaLakeConfig, "deltaLakeConfig is null");
}

@Override
Expand Down Expand Up @@ -255,9 +273,9 @@ private void doVacuum(
long transactionLogFiles = 0;
long retainedKnownFiles = 0;
long retainedUnknownFiles = 0;
long removedFiles = 0;
List<TrinoInputFile> filesToDelete = new ArrayList<>();
long filesToDeleteSize = 0;

List<Location> filesToDelete = new ArrayList<>();
FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
while (listing.hasNext()) {
FileEntry entry = listing.next();
Expand Down Expand Up @@ -295,21 +313,32 @@ private void doVacuum(
retainedUnknownFiles++;
continue;
}

log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime);
filesToDelete.add(entry.location());
if (filesToDelete.size() == DELETE_BATCH_SIZE) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
filesToDelete.clear();
}
}

if (!filesToDelete.isEmpty()) {
fileSystem.deleteFiles(filesToDelete);
removedFiles += filesToDelete.size();
Location fileLocation = Location.of(location);
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
filesToDelete.add(inputFile);
filesToDeleteSize += inputFile.length();
}
long readVersion = handle.getReadVersion();
logVacuumStart(handle, session, readVersion, readVersion + 1, filesToDelete.size(), filesToDeleteSize);
int totalFilesToDelete = filesToDelete.size();
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
String status = "FAILED";
try {
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);

List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
}
status = "COMPLETED";
}
finally {
int numVacuumedDirectories = Set.of(filesToDelete.stream().map(f -> f.location().parentDirectory())).size();
logVacuumEnd(handle, session, readVersion, readVersion + 2, filesToDelete.size(), numVacuumedDirectories, status);
}
log.info(
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
queryId,
Expand All @@ -319,7 +348,53 @@ private void doVacuum(
transactionLogFiles,
retainedKnownFiles,
retainedUnknownFiles,
removedFiles);
totalFilesToDelete);
}
}

private void logVacuumStart(DeltaLakeTableHandle handle, ConnectorSession session, long readVersion, long commitVersion, long numFilesToDelete, long filesToDeleteSize)
throws IOException
{
if (!deltaLakeConfig.isVacuumLoggingEnabled()) {
return;
}

long createdTime = System.currentTimeMillis();

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location());
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of("queryId", session.getQueryId(), "numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion));

transactionLogWriter.flush();
}

private void logVacuumEnd(DeltaLakeTableHandle handle, ConnectorSession session, long readVersion, long commitVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
throws IOException
{
if (!deltaLakeConfig.isVacuumLoggingEnabled()) {
return;
}

long createdTime = System.currentTimeMillis();

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location());
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("queryId", session.getQueryId(), "numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories), "status", status), readVersion)); // TODO: blindAppend false OK?
transactionLogWriter.flush();
}

private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap<String, String> metrics, long readVersion)
{
return new CommitInfoEntry(
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
operation,
metrics,
null,
null,
"trino-" + nodeVersion + "-" + nodeId,
readVersion,
IsolationLevel.WRITESERIALIZABLE.getValue(),
Optional.of(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private QueryRunner createDeltaLakeQueryRunner()
.put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("delta.register-table-procedure.enabled", "true")
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("delta.vacuum.logging.enabled", "true")
.putAll(deltaStorageConfiguration())
.buildOrThrow())
.setSchemaLocation(getLocationForTable(bucketName, SCHEMA))
Expand Down Expand Up @@ -1746,6 +1747,9 @@ public void testVacuum()
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
// old files should be cleaned up
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(updatedFiles);
// operations should be logged
assertQuery("SELECT version, operation FROM \"" + tableName + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT'), (1, 'MERGE'), (2, 'VacuumStart'), (3, 'VacuumEnd'), (4, 'VacuumStart'), (5, 'VacuumEnd')");

}
finally {
assertUpdate("DROP TABLE " + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testDefaults()
.setQueryPartitionFilterRequired(false)
.setDeletionVectorsEnabled(false)
.setDeltaLogFileSystemCacheDisabled(false)
.setMetadataParallelism(8));
.setMetadataParallelism(8)
.setVacuumLoggingEnabled(false));
}

@Test
Expand Down Expand Up @@ -118,6 +119,7 @@ public void testExplicitPropertyMappings()
.put("delta.deletion-vectors-enabled", "true")
.put("delta.fs.cache.disable-transaction-log-caching", "true")
.put("delta.metadata.parallelism", "10")
.put("delta.vacuum.logging.enabled", "true")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -156,7 +158,8 @@ public void testExplicitPropertyMappings()
.setQueryPartitionFilterRequired(true)
.setDeletionVectorsEnabled(true)
.setDeltaLogFileSystemCacheDisabled(true)
.setMetadataParallelism(10);
.setMetadataParallelism(10)
.setVacuumLoggingEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 1ae966f

Please sign in to comment.