Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements for the Delta Lake history system table #18427

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,71 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;

public class DeltaLakeHistoryTable
implements SystemTable
{
private final SchemaTableName tableName;
private final String tableLocation;
private final TrinoFileSystemFactory fileSystemFactory;
private final TransactionLogAccess transactionLogAccess;
private final ConnectorTableMetadata tableMetadata;
private final List<CommitInfoEntry> commitInfoEntries;

public DeltaLakeHistoryTable(SchemaTableName tableName, List<CommitInfoEntry> commitInfoEntries, TypeManager typeManager)
public DeltaLakeHistoryTable(
SchemaTableName tableName,
String tableLocation,
TrinoFileSystemFactory fileSystemFactory,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager)
{
requireNonNull(typeManager, "typeManager is null");
this.commitInfoEntries = ImmutableList.copyOf(requireNonNull(commitInfoEntries, "commitInfoEntries is null")).stream()
.sorted(comparingLong(CommitInfoEntry::getVersion).reversed())
.collect(toImmutableList());
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");

tableMetadata = new ConnectorTableMetadata(
this.tableMetadata = new ConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("version", BIGINT))
Expand Down Expand Up @@ -85,13 +110,69 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
if (commitInfoEntries.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
try {
// Verify the transaction log is readable
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(baseTableName, tableLocation, session);
transactionLogAccess.getMetadataEntry(tableSnapshot, session);
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
}

int versionColumnIndex = IntStream.range(0, tableMetadata.getColumns().size())
.filter(i -> tableMetadata.getColumns().get(i).getName().equals("version"))
.boxed()
.collect(onlyElement());

Optional<Long> startVersionExclusive = Optional.empty();
Optional<Long> endVersionInclusive = Optional.empty();

if (constraint.getDomains().isPresent()) {
Map<Integer, Domain> domains = constraint.getDomains().get();
if (domains.containsKey(versionColumnIndex)) {
Domain versionDomain = domains.get(versionColumnIndex); // The zero value here relies on the column ordering defined in the constructor
Range range = versionDomain.getValues().getRanges().getSpan();
if (range.isSingleValue()) {
long value = (long) range.getSingleValue();
startVersionExclusive = Optional.of(value - 1);
endVersionInclusive = Optional.of(value);
}
else {
Optional<Long> lowValue = range.getLowValue().map(Long.class::cast);
if (lowValue.isPresent()) {
startVersionExclusive = Optional.of(lowValue.get() - (range.isLowInclusive() ? 1 : 0));
}

Optional<Long> highValue = range.getHighValue().map(Long.class::cast);
if (highValue.isPresent()) {
endVersionInclusive = Optional.of(highValue.get() - (range.isHighInclusive() ? 0 : 1));
}
}
}
}

if (startVersionExclusive.isPresent() && endVersionInclusive.isPresent() && startVersionExclusive.get() >= endVersionInclusive.get()) {
return new EmptyPageSource();
}

TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
List<CommitInfoEntry> commitInfoEntries = TransactionLogTail.loadNewTail(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive).getFileEntries().stream()
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.collect(toImmutableList());
return new FixedPageSource(buildPages(session, commitInfoEntries));
}
catch (TrinoException e) {
throw e;
}
catch (IOException | RuntimeException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting commit info entries from " + tableLocation, e);
}
return new FixedPageSource(buildPages(session));
}

private List<Page> buildPages(ConnectorSession session)
private List<Page> buildPages(ConnectorSession session, List<CommitInfoEntry> commitInfoEntries)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3114,11 +3114,10 @@ private static boolean isFileCreatedByQuery(Location file, String queryId)
@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
return getRawSystemTable(session, tableName)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
return getRawSystemTable(tableName).map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName systemTableName)
private Optional<SystemTable> getRawSystemTable(SchemaTableName systemTableName)
{
Optional<DeltaLakeTableType> tableType = DeltaLakeTableName.tableTypeFrom(systemTableName.getTableName());
if (tableType.isEmpty() || tableType.get() == DeltaLakeTableType.DATA) {
Expand All @@ -3138,28 +3137,16 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
}

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(new SchemaTableName(systemTableName.getSchemaName(), tableName), tableLocation, session);
MetadataEntry metadataEntry;
try {
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return Optional.empty();
}
throw e;
}

return switch (tableType.get()) {
case DATA -> throw new VerifyException("Unexpected DATA table type"); // Handled above.
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
systemTableName,
getCommitInfoEntries(tableLocation, session),
tableLocation,
fileSystemFactory,
transactionLogAccess,
typeManager));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(
systemTableName,
metadataEntry,
transactionLogAccess.getProtocolEntry(session, tableSnapshot)));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -29,6 +32,7 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.io.IOException;
import java.util.List;

import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -46,14 +50,16 @@ public class DeltaLakePropertiesTable
.add(new ColumnMetadata("value", VARCHAR))
.build();

private final SchemaTableName tableName;
private final String tableLocation;
private final TransactionLogAccess transactionLogAccess;
private final ConnectorTableMetadata tableMetadata;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;

public DeltaLakePropertiesTable(SchemaTableName tableName, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
public DeltaLakePropertiesTable(SchemaTableName tableName, String tableLocation, TransactionLogAccess transactionLogAccess)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS);
}

Expand All @@ -72,10 +78,23 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages());
MetadataEntry metadataEntry;
ProtocolEntry protocolEntry;

try {
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(baseTableName, tableLocation, session);
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
}

return new FixedPageSource(buildPages(metadataEntry, protocolEntry));
}

private List<Page> buildPages()
private List<Page> buildPages(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,26 @@ public void testOptimizeUsingForcedPartitioning()
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
}

@Test
public void testHistoryTable()
{
String tableName = "test_history_table_" + randomNameSuffix();
try (TestTable table = new TestTable(getQueryRunner()::execute, tableName, "(int_col INTEGER)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES 1, 2, 3", 3);
assertUpdate("INSERT INTO " + table.getName() + " VALUES 4, 5, 6", 3);
assertUpdate("DELETE FROM " + table.getName() + " WHERE int_col = 1", 1);
assertUpdate("UPDATE " + table.getName() + " SET int_col = int_col * 2 WHERE int_col = 6", 1);

assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\"",
"VALUES (0, 'CREATE TABLE'), (1, 'WRITE'), (2, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version = 3", "VALUES (3, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 3", "VALUES (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 3 OR version = 1", "VALUES (1, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 1 AND version < 3", "VALUES (1, 'WRITE'), (2, 'WRITE')");
assertThat(query("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 1 AND version < 2")).returnsEmptyResult();
}
}

/**
* @see BaseDeltaLakeRegisterTableProcedureTest for more detailed tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo

// Assert queries fail cleanly
assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", ".* Table '.*\\$history' does not exist");
assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", ".* Table '.*\\$properties' does not exist");
assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
Expand Down
Loading