Skip to content

Commit

Permalink
Support adding columns in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 24, 2022
1 parent f365a80 commit 0da6e01
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public class DeltaLakeMetadata
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat");
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String INSERT_OPERATION = "WRITE";
public static final String DELETE_OPERATION = "DELETE";
public static final String UPDATE_OPERATION = "UPDATE";
Expand Down Expand Up @@ -614,8 +615,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
.map(column -> toColumnHandle(column, partitionColumns))
.collect(toImmutableList());
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());
appendInitialTableEntries(
appendTableEntries(
0,
transactionLogWriter,
randomUUID().toString(),
deltaLakeColumns,
partitionColumns,
buildDeltaMetadataConfiguration(checkpointInterval),
Expand Down Expand Up @@ -865,8 +868,10 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
// filesystems for which we have proper implementations of TransactionLogSynchronizers.
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.getLocation());

appendInitialTableEntries(
appendTableEntries(
0,
transactionLogWriter,
randomUUID().toString(),
handle.getInputColumns(),
handle.getPartitionedBy(),
buildDeltaMetadataConfiguration(handle.getCheckpointInterval()),
Expand Down Expand Up @@ -896,8 +901,52 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
return Optional.empty();
}

private static void appendInitialTableEntries(
@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
if (newColumnMetadata.getComment() != null) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments");
}

DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

try {
long commitVersion = handle.getReadVersion() + 1;

List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties());
ImmutableList.Builder<DeltaLakeColumnHandle> columnsBuilder = ImmutableList.builder();
columnsBuilder.addAll(tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.map(column -> toColumnHandle(column, partitionColumns))
.collect(toImmutableList()));
columnsBuilder.add(toColumnHandle(newColumnMetadata, partitionColumns));

Optional<Long> checkpointInterval = DeltaLakeTableProperties.getCheckpointInterval(tableMetadata.getProperties());

TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
handle.getMetadataEntry().getId(),
columnsBuilder.build(),
partitionColumns,
buildDeltaMetadataConfiguration(checkpointInterval),
ADD_COLUMN_OPERATION,
session,
nodeVersion,
nodeId);
transactionLogWriter.flush();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add '%s' column for: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()), e);
}
}

private static void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
String tableId,
List<DeltaLakeColumnHandle> columns,
List<String> partitionColumnNames,
Map<String, String> configuration,
Expand All @@ -909,7 +958,7 @@ private static void appendInitialTableEntries(
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(
new CommitInfoEntry(
0,
commitVersion,
createdTime,
session.getUser(),
session.getUser(),
Expand All @@ -926,7 +975,7 @@ private static void appendInitialTableEntries(

transactionLogWriter.appendMetadataEntry(
new MetadataEntry(
randomUUID().toString(),
tableId,
null,
null,
new Format("parquet", ImmutableMap.of()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
import io.trino.testing.BaseConnectorTest;
Expand All @@ -29,17 +31,23 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.union;
import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -98,7 +106,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_TOPN_PUSHDOWN:
case SUPPORTS_AGGREGATION_PUSHDOWN:
case SUPPORTS_RENAME_TABLE:
case SUPPORTS_ADD_COLUMN:
case SUPPORTS_ADD_COLUMN_WITH_COMMENT:
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_COMMENT_ON_TABLE:
Expand Down Expand Up @@ -139,6 +147,20 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e)
"|Target file .* was created during locking");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageMatching("Unable to add '.*' column for: .*")
.getCause()
.hasMessageMatching(
"Transaction log locked.*" +
"|.*/_delta_log/\\d+.json already exists" +
"|Conflicting concurrent writes found..*" +
"|Multiple live locks found for:.*" +
"|Target file .* was created during locking");
}

@Override
protected Optional<DataMappingTestSetup> filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup)
{
Expand Down Expand Up @@ -308,14 +330,130 @@ public Object[][] timestampValues()
{"9999-12-31 23:59:59.999 UTC"}};
}

@Test
public void testAddColumnToPartitionedTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_partitioned_table_", "(x VARCHAR, part VARCHAR) WITH (partitioned_by = ARRAY['part'])")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first', 'part-0001'", 1);
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN x bigint", ".* Column 'x' already exists");
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN part bigint", ".* Column 'part' already exists");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'part-0002', 'xxx'", 1);
assertQuery(
"SELECT x, part, a FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL), ('second', 'part-0002', 'xxx')");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN b double");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'third', 'part-0003', 'yyy', 33.3E0", 1);
assertQuery(
"SELECT x, part, a, b FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL, NULL), ('second', 'part-0002', 'xxx', NULL), ('third', 'part-0003', 'yyy', 33.3)");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS c varchar(50)");
assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS part varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'fourth', 'part-0004', 'zzz', 55.3E0, 'newColumn'", 1);
assertQuery(
"SELECT x, part, a, b, c FROM " + table.getName(),
"VALUES ('first', 'part-0001', NULL, NULL, NULL), ('second', 'part-0002', 'xxx', NULL, NULL), ('third', 'part-0003', 'yyy', 33.3, NULL), ('fourth', 'part-0004', 'zzz', 55.3, 'newColumn')");
}
}

private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId<MaterializedResult> queryResult)
{
return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId());
}

@Test
public void testAddColumnAndOptimize()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1);

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'xxx'", 1);
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', NULL), ('second', 'xxx')");

Set<String> beforeActiveFiles = getActiveFiles(table.getName());
computeActual("ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE");

// Verify OPTIMIZE happened, but table data didn't change
assertThat(beforeActiveFiles).isNotEqualTo(getActiveFiles(table.getName()));
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', NULL), ('second', 'xxx')");
}
}

@Test
public void testAddColumnAndVacuum()
throws Exception
{
Session sessionWithShortRetentionUnlocked = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "vacuum_min_retention", "0s")
.build();

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) {
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1);
assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second'", 1);

Set<String> initialFiles = getActiveFiles(table.getName());
assertThat(initialFiles).hasSize(2);

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");

assertUpdate("UPDATE " + table.getName() + " SET a = 'new column'", 2);
Stopwatch timeSinceUpdate = Stopwatch.createStarted();
Set<String> updatedFiles = getActiveFiles(table.getName());
assertThat(updatedFiles).hasSize(2).doesNotContainAnyElementsOf(initialFiles);
assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(union(initialFiles, updatedFiles));

assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', 'new column'), ('second', 'new column')");

MILLISECONDS.sleep(1_000 - timeSinceUpdate.elapsed(MILLISECONDS) + 1);
assertUpdate(sessionWithShortRetentionUnlocked, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + table.getName() + "', retention => '1s')");

// Verify VACUUM happened, but table data didn't change
assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(updatedFiles);
assertQuery(
"SELECT x, a FROM " + table.getName(),
"VALUES ('first', 'new column'), ('second', 'new column')");
}
}

@Override
protected String createSchemaSql(String schemaName)
{
return "CREATE SCHEMA " + schemaName + " WITH (location = 's3://" + bucketName + "/" + schemaName + "')";
}

private Set<String> getActiveFiles(String tableName)
{
return getActiveFiles(tableName, getQueryRunner().getDefaultSession());
}

private Set<String> getActiveFiles(String tableName, Session session)
{
return computeActual(session, "SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumnAsSet().stream()
.map(String.class::cast)
.collect(toImmutableSet());
}

private Set<String> getAllDataFilesFromTableDirectory(String tableName)
{
return getTableFiles(tableName).stream()
.filter(path -> !path.contains("/" + TRANSACTION_LOG_DIRECTORY))
.collect(toImmutableSet());
}

private List<String> getTableFiles(String tableName)
{
return dockerizedMinioDataLake.listFiles(format("%s/%s", SCHEMA, tableName)).stream()
.map(path -> format("s3://%s/%s", bucketName, path))
.collect(toImmutableList());
}
}

0 comments on commit 0da6e01

Please sign in to comment.