Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Dec 10, 2024
1 parent c69387c commit e65ea00
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DeltaLakeConfig
private long defaultCheckpointWritingInterval = 10;
private boolean checkpointFilteringEnabled = true;
private Duration vacuumMinRetention = new Duration(7, DAYS);
private boolean vacuumLoggingEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
Expand Down Expand Up @@ -280,6 +281,19 @@ public DeltaLakeConfig setVacuumMinRetention(Duration vacuumMinRetention)
return this;
}

public boolean isVacuumLoggingEnabled()
{
return vacuumLoggingEnabled;
}

@Config("delta.vacuum.logging.enabled")
@ConfigDescription("Whether to log vacuum information into the Delta transaction log")
public DeltaLakeConfig setVacuumLoggingEnabled(boolean vacuumLoggingEnabled)
{
this.vacuumLoggingEnabled = vacuumLoggingEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class DeltaLakeSessionProperties
{
public static final String MAX_SPLIT_SIZE = "max_split_size";
public static final String VACUUM_MIN_RETENTION = "vacuum_min_retention";
public static final String VACUUM_LOGGING_ENABLED = "vacuum_logging_enabled";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
Expand Down Expand Up @@ -96,6 +97,11 @@ public DeltaLakeSessionProperties(
"Minimal retention period for vacuum procedure",
deltaLakeConfig.getVacuumMinRetention(),
false),
booleanProperty(
VACUUM_LOGGING_ENABLED,
"Vacuum logging enabled",
deltaLakeConfig.isVacuumLoggingEnabled(),
false),
stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
Expand Down Expand Up @@ -255,6 +261,11 @@ public static Duration getVacuumMinRetention(ConnectorSession session)
return session.getProperty(VACUUM_MIN_RETENTION, Duration.class);
}

public static boolean isVacuumLoggingEnabled(ConnectorSession session)
{
return session.getProperty(VACUUM_LOGGING_ENABLED, Boolean.class);
}

public static Optional<String> getHiveCatalogName(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -32,11 +33,15 @@
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
Expand All @@ -52,6 +57,7 @@
import java.lang.invoke.MethodHandle;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -62,13 +68,16 @@
import static com.google.common.base.Predicates.alwaysFalse;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.filesystem.Locations.areDirectoryLocationsEquivalent;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isVacuumLoggingEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.IsolationLevel;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
Expand Down Expand Up @@ -100,18 +109,26 @@ public class VacuumProcedure
private final TrinoFileSystemFactory fileSystemFactory;
private final DeltaLakeMetadataFactory metadataFactory;
private final TransactionLogAccess transactionLogAccess;
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final String nodeVersion;
private final String nodeId;

@Inject
public VacuumProcedure(
CatalogName catalogName,
TrinoFileSystemFactory fileSystemFactory,
DeltaLakeMetadataFactory metadataFactory,
TransactionLogAccess transactionLogAccess)
TransactionLogAccess transactionLogAccess,
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeVersion = nodeManager.getCurrentNode().getVersion();
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
}

@Override
Expand Down Expand Up @@ -166,6 +183,7 @@ private void doVacuum(

Duration retentionDuration = Duration.valueOf(retention);
Duration minRetention = getVacuumMinRetention(session);
boolean isVacuumLoggingEnabled = isVacuumLoggingEnabled(session);
checkProcedureArgument(
retentionDuration.compareTo(minRetention) >= 0,
"Retention specified (%s) is shorter than the minimum retention configured in the system (%s). " +
Expand Down Expand Up @@ -206,10 +224,10 @@ private void doVacuum(
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(DELETION_VECTORS_FEATURE_NAME));
}

String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
String tableLocationString = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocationString);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/";
String commonPathPrefix = tableLocationString.endsWith("/") ? tableLocationString : tableLocationString + "/";
String queryId = session.getQueryId();

// Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent").
Expand Down Expand Up @@ -239,42 +257,53 @@ private void doVacuum(
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::path))
.peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path))
.peek(path -> checkState(!path.startsWith(tableLocationString), "Unexpected absolute path in transaction log: %s", path))
.collect(toImmutableSet());
}

log.debug(
"[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention",
queryId,
tableName,
tableLocation,
tableLocationString,
retention,
threshold,
retainedPaths.size());

Location tableLocation = Location.of(tableLocationString);
long allPathsChecked = 0;
long transactionLogFiles = 0;
long retainedKnownFiles = 0;
long retainedUnknownFiles = 0;
List<TrinoInputFile> filesToDelete = new ArrayList<>();
Set<String> vacuumedDirectories = new HashSet<>();
vacuumedDirectories.add(tableLocation.path());
long filesToDeleteSize = 0;
int numActualFilesDeleted = 0;

FileIterator listing = fileSystem.listFiles(Location.of(tableLocation));
FileIterator listing = fileSystem.listFiles(tableLocation);
while (listing.hasNext()) {
FileEntry entry = listing.next();

String location = entry.location().toString();
checkState(
location.startsWith(commonPathPrefix),
"Unexpected path [%s] returned when listing files under [%s]",
location,
tableLocation);
tableLocationString);
String relativePath = location.substring(commonPathPrefix.length());
if (relativePath.isEmpty()) {
// A file returned for "tableLocation/", might be possible on S3.
// A file returned for "tableLocationString/", might be possible on S3.
continue;
}
allPathsChecked++;
Location parentDirectory = entry.location().parentDirectory();
while (!areDirectoryLocationsEquivalent(parentDirectory, tableLocation)) {
vacuumedDirectories.add(parentDirectory.path());
parentDirectory = parentDirectory.parentDirectory();
}

// ignore tableLocation/_delta_log/**
// ignore tableLocationString/_delta_log/**
if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) {
log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location);
transactionLogFiles++;
Expand All @@ -300,27 +329,85 @@ private void doVacuum(
Location fileLocation = Location.of(location);
TrinoInputFile inputFile = fileSystem.newInputFile(fileLocation);
filesToDelete.add(inputFile);
filesToDeleteSize += inputFile.length();
}
long readVersion = handle.getReadVersion();
if (isVacuumLoggingEnabled) {
logVacuumStart(handle.location(), session, readVersion, filesToDelete.size(), filesToDeleteSize);
}
int totalFilesToDelete = filesToDelete.size();
int batchCount = (int) Math.ceil((double) totalFilesToDelete / DELETE_BATCH_SIZE);
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);
try {
for (int batchNumber = 0; batchNumber < batchCount; batchNumber++) {
int start = batchNumber * DELETE_BATCH_SIZE;
int end = Math.min(start + DELETE_BATCH_SIZE, totalFilesToDelete);

List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
List<TrinoInputFile> batch = filesToDelete.subList(start, end);
fileSystem.deleteFiles(batch.stream().map(TrinoInputFile::location).collect(toImmutableList()));
numActualFilesDeleted += batch.size();
}
}
catch (IOException e) {
if (isVacuumLoggingEnabled) {
// This mimics Delta behaviour where it sets metrics to 0 in case of a failure
logVacuumEnd(handle.location(), session, readVersion, 0, 0, "FAILED");
}
throw e;
}
if (isVacuumLoggingEnabled) {
logVacuumEnd(handle.location(), session, readVersion, numActualFilesDeleted, vacuumedDirectories.size(), "COMPLETED");
}

log.info(
"[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s",
queryId,
tableName,
tableLocation,
tableLocationString,
allPathsChecked,
transactionLogFiles,
retainedKnownFiles,
retainedUnknownFiles,
totalFilesToDelete);
}
}

private void logVacuumStart(String location, ConnectorSession session, long readVersion, long numFilesToDelete, long filesToDeleteSize)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 1;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM START", ImmutableMap.of("queryId", session.getQueryId()), ImmutableMap.of("numFilesToDelete", String.valueOf(numFilesToDelete), "sizeOfDataToDelete", String.valueOf(filesToDeleteSize)), readVersion));

transactionLogWriter.flush();
}

private void logVacuumEnd(String location, ConnectorSession session, long readVersion, int numDeletedFiles, int numVacuumedDirectories, String status)
throws IOException
{
long createdTime = System.currentTimeMillis();
long commitVersion = readVersion + 2;

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location);
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(commitVersion, createdTime, session, "VACUUM END", ImmutableMap.of("queryId", session.getQueryId(), "status", status), ImmutableMap.of("numDeletedFiles", String.valueOf(numDeletedFiles), "numVacuumedDirectories", String.valueOf(numVacuumedDirectories)), readVersion));
transactionLogWriter.flush();
}

private CommitInfoEntry getCommitInfoEntry(long commitVersion, long createdTime, ConnectorSession session, String operation, ImmutableMap<String, String> operationParameters, ImmutableMap<String, String> operationMetrics, long readVersion)
{
return new CommitInfoEntry(
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
operation,
operationParameters,
null,
null,
"trino-" + nodeVersion + "-" + nodeId,
readVersion,
IsolationLevel.WRITESERIALIZABLE.getValue(),
Optional.of(true),
operationMetrics);
}
}
Loading

0 comments on commit e65ea00

Please sign in to comment.