Skip to content

Commit

Permalink
Support creating table with column comment in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 7, 2022
1 parent 497a247 commit fff9862
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnComments;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
Expand Down Expand Up @@ -386,8 +387,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) table;
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
List<ColumnMetadata> columns = getColumns(tableHandle.getMetadataEntry()).stream()
.map(DeltaLakeMetadata::getColumnMetadata)
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.collect(toImmutableList());

ImmutableMap.Builder<String, Object> properties = ImmutableMap.<String, Object>builder()
Expand Down Expand Up @@ -434,7 +436,9 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
return getColumnMetadata((DeltaLakeColumnHandle) columnHandle);
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) columnHandle;
return getColumnMetadata(column, getColumnComments(table.getMetadataEntry()).get(column.getName()));
}

/**
Expand Down Expand Up @@ -495,8 +499,9 @@ public Stream<TableColumnsMetadata> streamTableColumns(ConnectorSession session,

// intentionally skip case when table snapshot is present but it lacks metadata portion
return metastore.getMetadata(metastore.getSnapshot(table, session), session).stream().map(metadata -> {
Map<String, String> columnComments = getColumnComments(metadata);
List<ColumnMetadata> columnMetadata = getColumns(metadata).stream()
.map(DeltaLakeMetadata::getColumnMetadata)
.map(column -> getColumnMetadata(column, columnComments.get(column.getName())))
.collect(toImmutableList());
return TableColumnsMetadata.forTable(table, columnMetadata);
});
Expand Down Expand Up @@ -585,10 +590,6 @@ public void dropSchema(ConnectorSession session, String schemaName)
@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
if (tableMetadata.getColumns().stream().anyMatch(column -> column.getComment() != null)) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment");
}

SchemaTableName schemaTableName = tableMetadata.getTable();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
Expand Down Expand Up @@ -621,13 +622,17 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
.stream()
.map(column -> toColumnHandle(column, partitionColumns))
.collect(toImmutableList());
Map<String, String> columnComments = tableMetadata.getColumns().stream()
.filter(column -> column.getComment() != null)
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getComment));
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, targetPath.toString());
appendTableEntries(
0,
transactionLogWriter,
randomUUID().toString(),
deltaLakeColumns,
partitionColumns,
columnComments,
buildDeltaMetadataConfiguration(checkpointInterval),
CREATE_TABLE_OPERATION,
session,
Expand Down Expand Up @@ -889,6 +894,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
randomUUID().toString(),
handle.getInputColumns(),
handle.getPartitionedBy(),
ImmutableMap.of(),
buildDeltaMetadataConfiguration(handle.getCheckpointInterval()),
CREATE_TABLE_AS_OPERATION,
session,
Expand Down Expand Up @@ -965,6 +971,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
handle.getMetadataEntry().getId(),
columnsBuilder.build(),
partitionColumns,
getColumnComments(handle.getMetadataEntry()),
buildDeltaMetadataConfiguration(checkpointInterval),
ADD_COLUMN_OPERATION,
session,
Expand All @@ -984,6 +991,7 @@ private static void appendTableEntries(
String tableId,
List<DeltaLakeColumnHandle> columns,
List<String> partitionColumnNames,
Map<String, String> columnComments,
Map<String, String> configuration,
String operation,
ConnectorSession session,
Expand Down Expand Up @@ -1015,7 +1023,7 @@ private static void appendTableEntries(
null,
comment.orElse(null),
new Format("parquet", ImmutableMap.of()),
serializeSchemaAsJson(columns),
serializeSchemaAsJson(columns, columnComments),
partitionColumnNames,
ImmutableMap.copyOf(configuration),
createdTime));
Expand Down Expand Up @@ -2180,12 +2188,13 @@ public DeltaLakeMetastore getMetastore()
return metastore;
}

private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column)
private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment)
{
return ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.setHidden(column.getColumnType() == SYNTHESIZED)
.setComment(Optional.ofNullable(comment))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.VarcharType;

import javax.annotation.Nullable;

import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
Expand Down Expand Up @@ -100,31 +104,36 @@ public static List<DeltaLakeColumnHandle> extractPartitionColumns(List<ColumnMet
.collect(toImmutableList());
}

public static String serializeSchemaAsJson(List<DeltaLakeColumnHandle> columns)
public static String serializeSchemaAsJson(List<DeltaLakeColumnHandle> columns, Map<String, String> columnComments)
{
try {
return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns));
return OBJECT_MAPPER.writeValueAsString(serializeStructType(columns, columnComments));
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to encode Delta Lake schema", e);
}
}

private static Map<String, Object> serializeStructType(List<DeltaLakeColumnHandle> columns)
private static Map<String, Object> serializeStructType(List<DeltaLakeColumnHandle> columns, Map<String, String> columnComments)
{
ImmutableMap.Builder<String, Object> schema = ImmutableMap.builder();

schema.put("fields", columns.stream().map(column -> serializeStructField(column.getName(), column.getType())).collect(toImmutableList()));
schema.put("fields", columns.stream().map(column -> serializeStructField(column.getName(), column.getType(), columnComments.get(column.getName()))).collect(toImmutableList()));
schema.put("type", "struct");

return schema.buildOrThrow();
}

private static Map<String, Object> serializeStructField(String name, Type type)
private static Map<String, Object> serializeStructField(String name, Type type, @Nullable String comment)
{
ImmutableMap.Builder<String, Object> fieldContents = ImmutableMap.builder();

fieldContents.put("metadata", ImmutableMap.of());
ImmutableMap.Builder<String, Object> metadata = ImmutableMap.builder();
if (comment != null) {
metadata.put("comment", comment);
}

fieldContents.put("metadata", metadata.buildOrThrow());
fieldContents.put("name", name);
fieldContents.put("nullable", true); // TODO: Is column nullability configurable in Trino?
fieldContents.put("type", serializeColumnType(type));
Expand Down Expand Up @@ -174,7 +183,7 @@ private static Map<String, Object> serializeStructType(RowType rowType)
ImmutableMap.Builder<String, Object> fields = ImmutableMap.builder();

fields.put("type", "struct");
fields.put("fields", rowType.getFields().stream().map(field -> serializeStructField(field.getName().orElse(null), field.getType())).collect(toImmutableList()));
fields.put("fields", rowType.getFields().stream().map(field -> serializeStructField(field.getName().orElse(null), field.getType(), null)).collect(toImmutableList()));

return fields.buildOrThrow();
}
Expand Down Expand Up @@ -282,9 +291,37 @@ private static ColumnMetadata mapColumn(TypeManager typeManager, JsonNode node)
.setName(fieldName)
.setType(buildType(typeManager, typeNode))
.setNullable(nullable)
.setComment(Optional.ofNullable(getComment(node)))
.build();
}

public static Map<String, String> getColumnComments(MetadataEntry metadataEntry)
{
return Optional.ofNullable(metadataEntry.getSchemaString())
.map(DeltaLakeSchemaSupport::getColumnComments)
.orElseThrow(() -> new IllegalStateException("Serialized schema not found in transaction log for " + metadataEntry.getName()));
}

private static Map<String, String> getColumnComments(String json)
{
try {
return stream(OBJECT_MAPPER.readTree(json).get("fields").elements())
.map(field -> new AbstractMap.SimpleEntry<>(field.get("name").asText(), getComment(field)))
.filter(entry -> entry.getValue() != null)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to parse serialized schema: " + json, e);
}
}

@Nullable
private static String getComment(JsonNode node)
{
JsonNode comment = node.get("metadata").get("comment");
return comment == null ? null : comment.asText();
}

private static Type buildType(TypeManager typeManager, JsonNode typeNode)
{
if (typeNode.isContainerNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_AGGREGATION_PUSHDOWN:
case SUPPORTS_RENAME_TABLE:
case SUPPORTS_ADD_COLUMN_WITH_COMMENT:
case SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT:
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_COMMENT_ON_TABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testSerializeSchemaAsJson()
URL expected = getResource("io/trino/plugin/deltalake/transactionlog/schema/nested_schema.json");
ObjectMapper objectMapper = new ObjectMapper();

String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn));
String jsonEncoding = serializeSchemaAsJson(ImmutableList.of(arrayColumn, structColumn, mapColumn), ImmutableMap.of());
assertEquals(objectMapper.readTree(jsonEncoding), objectMapper.readTree(expected));
}

Expand All @@ -202,7 +202,7 @@ public void testRoundTripComplexSchema()
.map(metadata -> new DeltaLakeColumnHandle(metadata.getName(), metadata.getType(), REGULAR))
.collect(toImmutableList());
ObjectMapper objectMapper = new ObjectMapper();
assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles)), objectMapper.readTree(json));
assertEquals(objectMapper.readTree(serializeSchemaAsJson(columnHandles, ImmutableMap.of())), objectMapper.readTree(json));
}

@Test(dataProvider = "supportedTypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,60 @@ private static String getTableCommentOnDelta(String schemaName, String tableName
.map(row -> row.get(1))
.findFirst().orElseThrow();
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testCreateTableWithColumnCommentOnTrino()
{
String tableName = "test_dl_create_column_comment_" + randomTableSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT COMMENT 'test comment') WITH (location = 's3://%s/%s')",
tableName,
bucketName,
tableDirectory));

try {
assertEquals(getColumnCommentOnTrino("default", tableName, "col"), "test comment");
assertEquals(getColumnCommentOnDelta("default", tableName, "col"), "test comment");

// Verify that adding a new column doesn't remove existing column comments
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col INT");
assertEquals(getColumnCommentOnTrino("default", tableName, "col"), "test comment");
assertEquals(getColumnCommentOnDelta("default", tableName, "col"), "test comment");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testCreateTableWithColumnCommentOnDelta()
{
String tableName = "test_dl_create_column_comment_" + randomTableSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("CREATE TABLE default.%s (col INT COMMENT 'test comment') USING DELTA LOCATION 's3://%s/%s'",
tableName,
bucketName,
tableDirectory));

try {
assertEquals(getColumnCommentOnTrino("default", tableName, "col"), "test comment");
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

private static String getColumnCommentOnTrino(String schemaName, String tableName, String columnName)
{
QueryResult result = onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = '" + schemaName + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");
return (String) result.row(0).get(0);
}

private static String getColumnCommentOnDelta(String schemaName, String tableName, String columnName)
{
QueryResult result = onDelta().executeQuery(format("DESCRIBE %s.%s %s", schemaName, tableName, columnName));
return (String) result.row(2).get(1);
}
}

0 comments on commit fff9862

Please sign in to comment.