Skip to content

Commit

Permalink
Add Iceberg support for ALTER TABLE ... SET PROPERTIES
Browse files Browse the repository at this point in the history
The two properties which can be set are `format` and `format_version`.
  • Loading branch information
alexjo2144 authored and findepi committed May 5, 2022
1 parent 74b324c commit 9fca255
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
19 changes: 19 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,25 @@ otherwise the procedure will fail with similar message:
``Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)``.
The default value for this property is ``7d``.

.. _iceberg-alter-table-set-properties:

ALTER TABLE SET PROPERTIES
^^^^^^^^^^^^^^^^^^^^^^^^^^

The connector supports modifying the properties on existing tables using
:ref:`ALTER TABLE SET PROPERTIES <alter-table-set-properties>`.

The following table properties can be updated after a table is created:

* ``format``
* ``format_version``

For example, to update a table from v1 of the Iceberg specification to v2:

.. code-block:: sql
ALTER TABLE table_name SET PROPERTIES format_version = 2;
.. _iceberg-type-mapping:

Type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;

Expand Down Expand Up @@ -146,6 +147,7 @@
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
Expand Down Expand Up @@ -190,6 +192,7 @@
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;

public class IcebergMetadata
Expand Down Expand Up @@ -1146,6 +1149,52 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
catalog.renameTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), newTable);
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName());

transaction = icebergTable.newTransaction();
UpdateProperties updateProperties = transaction.updateProperties();

for (Map.Entry<String, Optional<Object>> propertyEntry : properties.entrySet()) {
String trinoPropertyName = propertyEntry.getKey();
Optional<Object> propertyValue = propertyEntry.getValue();

switch (trinoPropertyName) {
case FILE_FORMAT_PROPERTY:
updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg());
break;
case FORMAT_VERSION_PROPERTY:
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion));
break;
default:
// TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174
throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported");
}
}

try {
updateProperties.commit();
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e);
}
}

private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional<Object> value, Function<Object, String> toIcebergString)
{
if (value.isPresent()) {
updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get()));
}
else {
updateProperties.remove(icebergPropertyName);
}
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
Expand Down Expand Up @@ -3439,6 +3438,35 @@ public void testIfDeletesReturnsNumberOfRemovedRows()
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2);
}

@Test
public void testUpdatingFileFormat()
{
String tableName = "test_updating_file_format_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " WITH (format = 'orc') AS SELECT * FROM nation WHERE nationkey < 10", "SELECT count(*) FROM nation WHERE nationkey < 10");
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'ORC'");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format = 'parquet'");
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'PARQUET'");
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", "SELECT count(*) FROM nation WHERE nationkey >= 10");

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.orc'", "VALUES 1");
assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.parquet'", "VALUES 1");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testUpdatingInvalidTableProperty()
{
String tableName = "test_updating_invalid_table_property_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (a INT, b INT)");
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES not_a_valid_table_property = 'a value'"))
.hasMessage("Catalog 'iceberg' table property 'not_a_valid_table_property' does not exist");
assertUpdate("DROP TABLE " + tableName);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static io.trino.tpch.TpchTable.NATION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestIcebergV2
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -172,6 +174,39 @@ public void testV2TableWithEqualityDelete()
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
}

@Test
public void testUpgradeTableToV2FromTrino()
{
String tableName = "test_upgrade_table_to_v2_from_trino_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 1);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2");
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testDowngradingV2TableToV1Fails()
{
String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1"))
.hasMessage("Failed to commit new table properties")
.getRootCause()
.hasMessage("Cannot downgrade v2 table to v1");
}

@Test
public void testUpgradingToInvalidVersionFails()
{
String tableName = "test_upgrading_to_invalid_version_fails_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42"))
.hasMessage("Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2");
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Expand Down

0 comments on commit 9fca255

Please sign in to comment.