From 0da6e0101d67f1a40372f766030db22ba5ecd422 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 13 May 2022 14:51:23 +0900 Subject: [PATCH] Support adding columns in Delta Lake --- .../plugin/deltalake/DeltaLakeMetadata.java | 59 +++++++- .../BaseDeltaLakeMinioConnectorTest.java | 140 +++++++++++++++++- 2 files changed, 193 insertions(+), 6 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 1f316fadb39a..8299f58954cc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -223,6 +223,7 @@ public class DeltaLakeMetadata "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"); public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT"; public static final String CREATE_TABLE_OPERATION = "CREATE TABLE"; + public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS"; public static final String INSERT_OPERATION = "WRITE"; public static final String DELETE_OPERATION = "DELETE"; public static final String UPDATE_OPERATION = "UPDATE"; @@ -614,8 +615,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe .map(column -> toColumnHandle(column, partitionColumns)) .collect(toImmutableList()); TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString()); - appendInitialTableEntries( + appendTableEntries( + 0, transactionLogWriter, + randomUUID().toString(), deltaLakeColumns, partitionColumns, buildDeltaMetadataConfiguration(checkpointInterval), @@ -865,8 +868,10 @@ public Optional finishCreateTable( // filesystems for which we have proper implementations of TransactionLogSynchronizers. TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.getLocation()); - appendInitialTableEntries( + appendTableEntries( + 0, transactionLogWriter, + randomUUID().toString(), handle.getInputColumns(), handle.getPartitionedBy(), buildDeltaMetadataConfiguration(handle.getCheckpointInterval()), @@ -896,8 +901,52 @@ public Optional finishCreateTable( return Optional.empty(); } - private static void appendInitialTableEntries( + @Override + public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata) + { + if (newColumnMetadata.getComment() != null) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments"); + } + + DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; + ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); + + try { + long commitVersion = handle.getReadVersion() + 1; + + List partitionColumns = getPartitionedBy(tableMetadata.getProperties()); + ImmutableList.Builder columnsBuilder = ImmutableList.builder(); + columnsBuilder.addAll(tableMetadata.getColumns().stream() + .filter(column -> !column.isHidden()) + .map(column -> toColumnHandle(column, partitionColumns)) + .collect(toImmutableList())); + columnsBuilder.add(toColumnHandle(newColumnMetadata, partitionColumns)); + + Optional checkpointInterval = DeltaLakeTableProperties.getCheckpointInterval(tableMetadata.getProperties()); + + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation()); + appendTableEntries( + commitVersion, + transactionLogWriter, + handle.getMetadataEntry().getId(), + columnsBuilder.build(), + partitionColumns, + buildDeltaMetadataConfiguration(checkpointInterval), + ADD_COLUMN_OPERATION, + session, + nodeVersion, + nodeId); + transactionLogWriter.flush(); + } + catch (Exception e) { + throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add '%s' column for: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()), e); + } + } + + private static void appendTableEntries( + long commitVersion, TransactionLogWriter transactionLogWriter, + String tableId, List columns, List partitionColumnNames, Map configuration, @@ -909,7 +958,7 @@ private static void appendInitialTableEntries( long createdTime = System.currentTimeMillis(); transactionLogWriter.appendCommitInfoEntry( new CommitInfoEntry( - 0, + commitVersion, createdTime, session.getUser(), session.getUser(), @@ -926,7 +975,7 @@ private static void appendInitialTableEntries( transactionLogWriter.appendMetadataEntry( new MetadataEntry( - randomUUID().toString(), + tableId, null, null, new Format("parquet", ImmutableMap.of()), diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index d6169850219d..48fad77d4110 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -13,8 +13,10 @@ */ package io.trino.plugin.deltalake; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.Session; import io.trino.execution.QueryInfo; import io.trino.plugin.deltalake.util.DockerizedMinioDataLake; import io.trino.testing.BaseConnectorTest; @@ -29,17 +31,23 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.List; import java.util.Optional; import java.util.Set; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Sets.union; import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -98,7 +106,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_TOPN_PUSHDOWN: case SUPPORTS_AGGREGATION_PUSHDOWN: case SUPPORTS_RENAME_TABLE: - case SUPPORTS_ADD_COLUMN: + case SUPPORTS_ADD_COLUMN_WITH_COMMENT: case SUPPORTS_DROP_COLUMN: case SUPPORTS_RENAME_COLUMN: case SUPPORTS_COMMENT_ON_TABLE: @@ -139,6 +147,20 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e) "|Target file .* was created during locking"); } + @Override + protected void verifyConcurrentAddColumnFailurePermissible(Exception e) + { + assertThat(e) + .hasMessageMatching("Unable to add '.*' column for: .*") + .getCause() + .hasMessageMatching( + "Transaction log locked.*" + + "|.*/_delta_log/\\d+.json already exists" + + "|Conflicting concurrent writes found..*" + + "|Multiple live locks found for:.*" + + "|Target file .* was created during locking"); + } + @Override protected Optional filterCaseSensitiveDataMappingTestData(DataMappingTestSetup dataMappingTestSetup) { @@ -308,14 +330,130 @@ public Object[][] timestampValues() {"9999-12-31 23:59:59.999 UTC"}}; } + @Test + public void testAddColumnToPartitionedTable() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_partitioned_table_", "(x VARCHAR, part VARCHAR) WITH (partitioned_by = ARRAY['part'])")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first', 'part-0001'", 1); + assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN x bigint", ".* Column 'x' already exists"); + assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN part bigint", ".* Column 'part' already exists"); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)"); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'part-0002', 'xxx'", 1); + assertQuery( + "SELECT x, part, a FROM " + table.getName(), + "VALUES ('first', 'part-0001', NULL), ('second', 'part-0002', 'xxx')"); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN b double"); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'third', 'part-0003', 'yyy', 33.3E0", 1); + assertQuery( + "SELECT x, part, a, b FROM " + table.getName(), + "VALUES ('first', 'part-0001', NULL, NULL), ('second', 'part-0002', 'xxx', NULL), ('third', 'part-0003', 'yyy', 33.3)"); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS c varchar(50)"); + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN IF NOT EXISTS part varchar(50)"); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'fourth', 'part-0004', 'zzz', 55.3E0, 'newColumn'", 1); + assertQuery( + "SELECT x, part, a, b, c FROM " + table.getName(), + "VALUES ('first', 'part-0001', NULL, NULL, NULL), ('second', 'part-0002', 'xxx', NULL, NULL), ('third', 'part-0003', 'yyy', 33.3, NULL), ('fourth', 'part-0004', 'zzz', 55.3, 'newColumn')"); + } + } + private QueryInfo getQueryInfo(DistributedQueryRunner queryRunner, ResultWithQueryId queryResult) { return queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(queryResult.getQueryId()); } + @Test + public void testAddColumnAndOptimize() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)"); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second', 'xxx'", 1); + assertQuery( + "SELECT x, a FROM " + table.getName(), + "VALUES ('first', NULL), ('second', 'xxx')"); + + Set beforeActiveFiles = getActiveFiles(table.getName()); + computeActual("ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE"); + + // Verify OPTIMIZE happened, but table data didn't change + assertThat(beforeActiveFiles).isNotEqualTo(getActiveFiles(table.getName())); + assertQuery( + "SELECT x, a FROM " + table.getName(), + "VALUES ('first', NULL), ('second', 'xxx')"); + } + } + + @Test + public void testAddColumnAndVacuum() + throws Exception + { + Session sessionWithShortRetentionUnlocked = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "vacuum_min_retention", "0s") + .build(); + + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column_and_optimize", "(x VARCHAR)")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second'", 1); + + Set initialFiles = getActiveFiles(table.getName()); + assertThat(initialFiles).hasSize(2); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)"); + + assertUpdate("UPDATE " + table.getName() + " SET a = 'new column'", 2); + Stopwatch timeSinceUpdate = Stopwatch.createStarted(); + Set updatedFiles = getActiveFiles(table.getName()); + assertThat(updatedFiles).hasSize(2).doesNotContainAnyElementsOf(initialFiles); + assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(union(initialFiles, updatedFiles)); + + assertQuery( + "SELECT x, a FROM " + table.getName(), + "VALUES ('first', 'new column'), ('second', 'new column')"); + + MILLISECONDS.sleep(1_000 - timeSinceUpdate.elapsed(MILLISECONDS) + 1); + assertUpdate(sessionWithShortRetentionUnlocked, "CALL system.vacuum(schema_name => CURRENT_SCHEMA, table_name => '" + table.getName() + "', retention => '1s')"); + + // Verify VACUUM happened, but table data didn't change + assertThat(getAllDataFilesFromTableDirectory(table.getName())).isEqualTo(updatedFiles); + assertQuery( + "SELECT x, a FROM " + table.getName(), + "VALUES ('first', 'new column'), ('second', 'new column')"); + } + } + @Override protected String createSchemaSql(String schemaName) { return "CREATE SCHEMA " + schemaName + " WITH (location = 's3://" + bucketName + "/" + schemaName + "')"; } + + private Set getActiveFiles(String tableName) + { + return getActiveFiles(tableName, getQueryRunner().getDefaultSession()); + } + + private Set getActiveFiles(String tableName, Session session) + { + return computeActual(session, "SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumnAsSet().stream() + .map(String.class::cast) + .collect(toImmutableSet()); + } + + private Set getAllDataFilesFromTableDirectory(String tableName) + { + return getTableFiles(tableName).stream() + .filter(path -> !path.contains("/" + TRANSACTION_LOG_DIRECTORY)) + .collect(toImmutableSet()); + } + + private List getTableFiles(String tableName) + { + return dockerizedMinioDataLake.listFiles(format("%s/%s", SCHEMA, tableName)).stream() + .map(path -> format("s3://%s/%s", bucketName, path)) + .collect(toImmutableList()); + } }