Skip to content

Commit

Permalink
Add materialized view column support for Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-lyutenko authored and Praveen2112 committed Aug 2, 2023
1 parent 4e0711d commit 6168881
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ The Iceberg connector supports setting comments on the following objects:
- tables
- views
- table columns
- materialized view columns

The ``COMMENT`` option is supported on both the table and the table columns for
the :doc:`/sql/create-table` operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static IcebergMaterializedViewDefinition fromConnectorMaterializedViewDef
definition.getCatalog(),
definition.getSchema(),
definition.getColumns().stream()
.map(column -> new Column(column.getName(), column.getType()))
.map(column -> new Column(column.getName(), column.getType(), column.getComment()))
.collect(toImmutableList()),
definition.getGracePeriod(),
definition.getComment());
Expand Down Expand Up @@ -159,14 +159,17 @@ public static final class Column
{
private final String name;
private final TypeId type;
private final Optional<String> comment;

@JsonCreator
public Column(
@JsonProperty("name") String name,
@JsonProperty("type") TypeId type)
@JsonProperty("type") TypeId type,
@JsonProperty("comment") Optional<String> comment)
{
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.comment = requireNonNull(comment, "comment is null");
}

@JsonProperty
Expand All @@ -181,6 +184,12 @@ public TypeId getType()
return type;
}

@JsonProperty
public Optional<String> getComment()
{
return comment;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,12 @@ public void setViewColumnComment(ConnectorSession session, SchemaTableName viewN
catalog.updateViewColumnComment(session, viewName, columnName, comment);
}

@Override
public void setMaterializedViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional<String> comment)
{
catalog.updateMaterializedViewColumnComment(session, viewName, columnName, comment);
}

@Override
public Optional<ConnectorTableLayout> getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition(
definition.getCatalog(),
definition.getSchema(),
definition.getColumns().stream()
.map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType()))
.map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType(), column.getComment()))
.collect(toImmutableList()),
definition.getGracePeriod(),
definition.getComment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Transaction newCreateTableTransaction(

void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment);

void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment);

String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName);

void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,46 @@ public void createMaterializedView(
}
}

@Override
public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional<String> comment)
{
ConnectorMaterializedViewDefinition definition = doGetMaterializedView(session, viewName)
.orElseThrow(() -> new ViewNotFoundException(viewName));
ConnectorMaterializedViewDefinition newDefinition = new ConnectorMaterializedViewDefinition(
definition.getOriginalSql(),
definition.getStorageTable(),
definition.getCatalog(),
definition.getSchema(),
definition.getColumns().stream()
.map(currentViewColumn -> Objects.equals(columnName, currentViewColumn.getName()) ? new ConnectorMaterializedViewDefinition.Column(currentViewColumn.getName(), currentViewColumn.getType(), comment) : currentViewColumn)
.collect(toImmutableList()),
definition.getGracePeriod(),
definition.getComment(),
definition.getOwner(),
definition.getProperties());

updateMaterializedView(session, viewName, newDefinition);
}

private void updateMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition newDefinition)
{
TableInput materializedViewTableInput = getMaterializedViewTableInput(
viewName.getTableName(),
encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(newDefinition)),
session.getUser(),
createMaterializedViewProperties(session, newDefinition.getStorageTable().orElseThrow().getSchemaTableName()));

try {
stats.getUpdateTable().call(() ->
glueClient.updateTable(new UpdateTableRequest()
.withDatabaseName(viewName.getSchemaName())
.withTableInput(materializedViewTableInput)));
}
catch (AmazonServiceException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, e);
}
}

@Override
public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.BaseTable;
Expand All @@ -55,6 +56,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -528,6 +530,47 @@ public void createMaterializedView(
metastore.createTable(table, principalPrivileges);
}

@Override
public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional<String> comment)
{
io.trino.plugin.hive.metastore.Table existing = metastore.getTable(viewName.getSchemaName(), viewName.getTableName())
.orElseThrow(() -> new ViewNotFoundException(viewName));

if (!isTrinoMaterializedView(existing.getTableType(), existing.getParameters())) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Existing table is not a Materialized View: " + viewName);
}
ConnectorMaterializedViewDefinition definition = doGetMaterializedView(session, viewName)
.orElseThrow(() -> new ViewNotFoundException(viewName));

ConnectorMaterializedViewDefinition newDefinition = new ConnectorMaterializedViewDefinition(
definition.getOriginalSql(),
definition.getStorageTable(),
definition.getCatalog(),
definition.getSchema(),
definition.getColumns().stream()
.map(currentViewColumn -> Objects.equals(columnName, currentViewColumn.getName())
? new ConnectorMaterializedViewDefinition.Column(currentViewColumn.getName(), currentViewColumn.getType(), comment)
: currentViewColumn)
.collect(toImmutableList()),
definition.getGracePeriod(),
definition.getComment(),
definition.getOwner(),
definition.getProperties());

replaceMaterializedView(session, viewName, existing, newDefinition);
}

private void replaceMaterializedView(ConnectorSession session, SchemaTableName viewName, io.trino.plugin.hive.metastore.Table view, ConnectorMaterializedViewDefinition newDefinition)
{
io.trino.plugin.hive.metastore.Table.Builder viewBuilder = io.trino.plugin.hive.metastore.Table.builder(view)
.setViewOriginalText(Optional.of(
encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(newDefinition))));

PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser());

metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), viewBuilder.build(), principalPrivileges);
}

@Override
public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg JDBC catalogs");
}

@Override
public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment)
{
throw new TrinoException(NOT_SUPPORTED, "updateMaterializedViewColumnComment is not supported for Iceberg JDBC catalogs");
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg Nessie catalogs");
}

@Override
public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment)
{
throw new TrinoException(NOT_SUPPORTED, "updateMaterializedViewColumnComment is not supported for Iceberg Nessie catalogs");
}

@Override
public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg REST catalog");
}

@Override
public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment)
{
throw new TrinoException(NOT_SUPPORTED, "updateMaterializedViewColumnComment is not supported for Iceberg REST catalog");
}

private SessionCatalog.SessionContext convert(ConnectorSession session)
{
return switch (sessionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ public void testShowTables()
assertUpdate("DROP MATERIALIZED VIEW materialized_view_show_tables_test");
}

@Test
public void testCommentColumnMaterializedView()
{
String viewColumnName = "_bigint";
String materializedViewName = "test_materialized_view_" + randomNameSuffix();
assertUpdate(format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM base_table1", materializedViewName));
assertUpdate(format("COMMENT ON COLUMN %s.%s IS 'new comment'", materializedViewName, viewColumnName));
assertThat(getColumnComment(materializedViewName, viewColumnName)).isEqualTo("new comment");
assertQuery(format("SELECT count(*) FROM %s", materializedViewName), "VALUES 6");
assertUpdate(format("DROP MATERIALIZED VIEW %s", materializedViewName));
}

@Test
public void testMaterializedViewsMetadata()
{
Expand Down Expand Up @@ -777,6 +789,11 @@ public Object[][] testTemporalPartitioningDataProvider()
};
}

protected String getColumnComment(String tableName, String columnName)
{
return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");
}

private SchemaTableName getStorageTable(String materializedViewName)
{
return getStorageTable(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), materializedViewName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.regex.Pattern;

import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_VIEW;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_VIEW_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_MATERIALIZED_VIEW;
Expand Down Expand Up @@ -660,6 +661,49 @@ public void testCommentViewColumn()
}
}

@Test
public void testCommentMaterializedViewColumn()
{
if (!hasBehavior(SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN)) {
if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) {
String viewName = "test_materialized_view_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM nation");
assertQueryFails("COMMENT ON COLUMN " + viewName + ".regionkey IS 'new region key comment'", "This connector does not support setting materialized view column comments");
assertUpdate("DROP MATERIALIZED VIEW " + viewName);
return;
}
throw new SkipException("Skipping as connector does not support MATERIALIZED VIEW COLUMN COMMENT");
}

String viewName = "test_materialized_view_" + randomNameSuffix();
try {
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM nation");

// comment set
assertUpdate("COMMENT ON COLUMN " + viewName + ".regionkey IS 'new region key comment'");
assertThat(getColumnComment(viewName, "regionkey")).isEqualTo("new region key comment");

// comment updated
assertUpdate("COMMENT ON COLUMN " + viewName + ".regionkey IS 'updated region key comment'");
assertThat(getColumnComment(viewName, "regionkey")).isEqualTo("updated region key comment");

// refresh materialized view
assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 25);
assertThat(getColumnComment(viewName, "regionkey")).isEqualTo("updated region key comment");

// comment set to empty
assertUpdate("COMMENT ON COLUMN " + viewName + ".regionkey IS ''");
assertThat(getColumnComment(viewName, "regionkey")).isEqualTo("");

// comment deleted
assertUpdate("COMMENT ON COLUMN " + viewName + ".regionkey IS NULL");
assertThat(getColumnComment(viewName, "regionkey")).isEqualTo(null);
}
finally {
assertUpdate("DROP MATERIALIZED VIEW " + viewName);
}
}

protected String getTableComment(String tableName)
{
return (String) computeScalar(format("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = '%s' AND schema_name = '%s' AND table_name = '%s'", getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ADD_FIELD;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ARRAY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_TABLE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_VIEW;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_COMMENT_ON_VIEW_COLUMN;
Expand Down Expand Up @@ -1584,6 +1585,48 @@ private Optional<ZonedDateTime> getMaterializedViewLastFreshTime(String material
return Optional.ofNullable(lastFreshTime);
}

@Test
public void testColumnCommentMaterializedView()
{
if (!hasBehavior(SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN)) {
if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) {
String viewName = "test_materialized_view_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM nation");
assertQueryFails("COMMENT ON COLUMN " + viewName + ".regionkey IS 'new region key comment'", "This connector does not support setting materialized view column comments");
assertUpdate("DROP MATERIALIZED VIEW " + viewName);
return;
}
throw new SkipException("Skipping as connector does not support MATERIALIZED VIEW COLUMN COMMENT");
}

String viewName = "test_materialized_view_" + randomNameSuffix();
try {
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM nation");

assertUpdate("COMMENT ON COLUMN " + viewName + ".name IS 'new comment'");
assertThat(getColumnComment(viewName, "name")).isEqualTo("new comment");

// comment deleted
assertUpdate("COMMENT ON COLUMN " + viewName + ".name IS NULL");
assertThat(getColumnComment(viewName, "name")).isEqualTo(null);

// comment set to non-empty value before verifying setting empty comment
assertUpdate("COMMENT ON COLUMN " + viewName + ".name IS 'updated comment'");
assertThat(getColumnComment(viewName, "name")).isEqualTo("updated comment");

// refresh materialized view
assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 25);
assertThat(getColumnComment(viewName, "name")).isEqualTo("updated comment");

// comment set to empty
assertUpdate("COMMENT ON COLUMN " + viewName + ".name IS ''");
assertThat(getColumnComment(viewName, "name")).isEmpty();
}
finally {
assertUpdate("DROP MATERIALIZED VIEW " + viewName);
}
}

@Test
public void testCompatibleTypeChangeForView()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public enum TestingConnectorBehavior
SUPPORTS_MATERIALIZED_VIEW_FRESHNESS_FROM_BASE_TABLES(SUPPORTS_CREATE_MATERIALIZED_VIEW),
SUPPORTS_RENAME_MATERIALIZED_VIEW(SUPPORTS_CREATE_MATERIALIZED_VIEW),
SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS(SUPPORTS_RENAME_MATERIALIZED_VIEW),
SUPPORTS_COMMENT_ON_MATERIALIZED_VIEW_COLUMN(SUPPORTS_CREATE_MATERIALIZED_VIEW),

SUPPORTS_NOT_NULL_CONSTRAINT(SUPPORTS_CREATE_TABLE),
SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT(and(SUPPORTS_NOT_NULL_CONSTRAINT, SUPPORTS_ADD_COLUMN)),
Expand Down

0 comments on commit 6168881

Please sign in to comment.