From d868e910c944ea9bc35ef9ab83b560f811b8bd21 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Sat, 30 Jul 2022 12:37:52 +0900 Subject: [PATCH] Support JDBC catalog in Iceberg connector --- docs/src/main/sphinx/connector/iceberg.rst | 27 +- plugin/trino-iceberg/pom.xml | 23 ++ .../io/trino/plugin/iceberg/CatalogType.java | 1 + .../iceberg/catalog/IcebergCatalogModule.java | 3 + .../jdbc/IcebergJdbcCatalogModule.java | 49 +++ .../catalog/jdbc/IcebergJdbcClient.java | 83 ++++ .../catalog/jdbc/IcebergJdbcConfig.java | 69 ++++ .../jdbc/IcebergJdbcConnectionFactory.java | 43 +++ .../jdbc/IcebergJdbcTableOperations.java | 70 ++++ .../IcebergJdbcTableOperationsProvider.java | 59 +++ .../catalog/jdbc/TrinoJdbcCatalog.java | 354 ++++++++++++++++++ .../catalog/jdbc/TrinoJdbcCatalogFactory.java | 94 +++++ .../plugin/iceberg/IcebergQueryRunner.java | 8 +- .../plugin/iceberg/TestIcebergPlugin.java | 16 + .../jdbc/TestIcebergIcebergJdbcConfig.java | 52 +++ ...tIcebergJdbcCatalogConnectorSmokeTest.java | 144 +++++++ .../jdbc/TestIcebergJdbcConnectorTest.java | 261 +++++++++++++ .../jdbc/TestingIcebergJdbcServer.java | 80 ++++ .../jdbc/TestingTrinoIcebergJdbcUtil.java | 23 ++ .../EnvSinglenodeSparkIcebergJdbcCatalog.java | 134 +++++++ .../launcher/suite/suites/SuiteIceberg.java | 4 + .../apply-hive-config-for-iceberg.sh | 5 + .../create-table.sql | 20 + .../hive-site-overrides.xml | 9 + .../iceberg.properties | 6 + .../spark-defaults.conf | 13 + .../tpch.properties | 1 + .../io/trino/tests/product/TestGroups.java | 1 + .../TestIcebergSparkCompatibility.java | 152 +++++--- 29 files changed, 1739 insertions(+), 65 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcClient.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConfig.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergIcebergJdbcConfig.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java create mode 100644 plugin/trino-iceberg/src/test/java/org/apache/iceberg/jdbc/TestingTrinoIcebergJdbcUtil.java create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java create mode 100755 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/tpch.properties diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index f2b103be9936..ef2deb8ae1be 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -47,7 +47,7 @@ Configuration The connector supports multiple Iceberg catalog types, you may use either a Hive metastore service (HMS), AWS Glue, or a REST catalog. The catalog type is determined by the -``iceberg.catalog.type`` property, it can be set to ``HIVE_METASTORE``, ``GLUE``, or ``REST``. +``iceberg.catalog.type`` property, it can be set to ``HIVE_METASTORE``, ``GLUE``, ``JDBC``, or ``REST``. .. _iceberg-hive-catalog: @@ -80,6 +80,8 @@ configuration properties as the Hive connector's Glue setup. See connector.name=iceberg iceberg.catalog.type=glue +.. _iceberg-rest-catalog: + REST catalog ^^^^^^^^^^^^^^ @@ -118,6 +120,29 @@ Property Name Description iceberg.rest-catalog.uri=http://iceberg-with-rest:8181 +.. _iceberg-jdbc-catalog: + +JDBC catalog +^^^^^^^^^^^^ + +.. warning:: + + The JDBC catalog may face the compatibility issue if Iceberg introduces breaking changes in the future. + Consider the :ref:`REST catalog ` as an alternative solution. + +At a minimum, ``iceberg.jdbc-catalog.connection-url`` and +``iceberg.jdbc-catalog.catalog-name`` must be configured. +When using any database besides PostgreSQL, a JDBC driver jar file must be placed in the plugin directory. + +.. code-block:: text + + connector.name=iceberg + iceberg.catalog.type=jdbc + iceberg.jdbc-catalog.catalog-name=test + iceberg.jdbc-catalog.connection-url=jdbc:postgresql://example.net:5432/database?user=admin&password=test + iceberg.jdbc-catalog.default-warehouse-dir=s3://bucket + + General configuration ^^^^^^^^^^^^^^^^^^^^^ diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 525bd856fa1a..39963ce142b3 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -216,6 +216,11 @@ iceberg-parquet + + org.jdbi + jdbi3-core + + org.roaringbitmap RoaringBitmap @@ -297,6 +302,12 @@ runtime + + org.postgresql + postgresql + runtime + + io.trino @@ -452,6 +463,18 @@ test + + org.testcontainers + postgresql + test + + + + org.testcontainers + testcontainers + test + + org.testng testng diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java index 0e972cc41538..ed2ffb761051 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -19,5 +19,6 @@ public enum CatalogType HIVE_METASTORE, GLUE, REST, + JDBC, /**/; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index ac7b8149d810..a2cf660043fc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -21,11 +21,13 @@ import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.jdbc.IcebergJdbcCatalogModule; import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogModule; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.CatalogType.JDBC; import static io.trino.plugin.iceberg.CatalogType.REST; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; @@ -39,6 +41,7 @@ protected void setup(Binder binder) bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); bindCatalogModule(REST, new IcebergRestCatalogModule()); + bindCatalogModule(JDBC, new IcebergJdbcCatalogModule()); } private void bindCatalogModule(CatalogType catalogType, Module module) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java new file mode 100644 index 000000000000..cd80f84486a8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class IcebergJdbcCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergJdbcConfig.class); + binder.bind(IcebergTableOperationsProvider.class).to(IcebergJdbcTableOperationsProvider.class).in(Scopes.SINGLETON); + newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName(); + binder.bind(TrinoCatalogFactory.class).to(TrinoJdbcCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(TrinoJdbcCatalogFactory.class); + newExporter(binder).export(TrinoJdbcCatalogFactory.class).withGeneratedName(); + } + + @Provides + @Singleton + public static IcebergJdbcClient createIcebergJdbcClient(IcebergJdbcConfig config) + { + return new IcebergJdbcClient( + new IcebergJdbcConnectionFactory(config.getConnectionUrl()), + config.getCatalogName()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcClient.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcClient.java new file mode 100644 index 000000000000..66db3a79342c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcClient.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class IcebergJdbcClient +{ + private final IcebergJdbcConnectionFactory connectionFactory; + private final String catalogName; + + public IcebergJdbcClient(IcebergJdbcConnectionFactory connectionFactory, String catalogName) + { + this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory is null"); + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + } + + public void createTable(String schemaName, String tableName, String metadataLocation) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "INSERT INTO iceberg_tables " + + "(catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location) " + + "VALUES (:catalog, :schema, :table, :metadata_location, null)") + .bind("catalog", catalogName) + .bind("schema", schemaName) + .bind("table", tableName) + .bind("metadata_location", metadataLocation) + .execute(); + } + } + + public void alterTable(String schemaName, String tableName, String newMetadataLocation, String previousMetadataLocation) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + int updatedRecords = handle.createUpdate("" + + "UPDATE iceberg_tables " + + "SET metadata_location = :metadata_location, previous_metadata_location = :previous_metadata_location " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table AND metadata_location = :previous_metadata_location") + .bind("metadata_location", newMetadataLocation) + .bind("previous_metadata_location", previousMetadataLocation) + .bind("catalog", catalogName) + .bind("schema", schemaName) + .bind("table", tableName) + .execute(); + if (updatedRecords != 1) { + throw new CommitFailedException("Failed to update table due to concurrent updates"); + } + } + } + + public Optional getMetadataLocation(String schemaName, String tableName) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + return handle.createQuery("" + + "SELECT metadata_location " + + "FROM iceberg_tables " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table") + .bind("catalog", catalogName) + .bind("schema", schemaName) + .bind("table", tableName) + .mapTo(String.class) + .findOne(); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConfig.java new file mode 100644 index 000000000000..c6c6c3d8c4b3 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; + +import javax.validation.constraints.NotEmpty; + +public class IcebergJdbcConfig +{ + private String connectionUrl; + private String catalogName; + private String defaultWarehouseDir; + + public String getConnectionUrl() + { + return connectionUrl; + } + + @Config("iceberg.jdbc-catalog.connection-url") + @ConfigDescription("The URI to connect to the JDBC server") + @ConfigSecuritySensitive + public IcebergJdbcConfig setConnectionUrl(String connectionUrl) + { + this.connectionUrl = connectionUrl; + return this; + } + + @NotEmpty + public String getCatalogName() + { + return catalogName; + } + + @Config("iceberg.jdbc-catalog.catalog-name") + @ConfigDescription("Iceberg JDBC metastore catalog name") + public IcebergJdbcConfig setCatalogName(String catalogName) + { + this.catalogName = catalogName; + return this; + } + + @NotEmpty + public String getDefaultWarehouseDir() + { + return defaultWarehouseDir; + } + + @Config("iceberg.jdbc-catalog.default-warehouse-dir") + @ConfigDescription("The default warehouse directory to use for JDBC") + public IcebergJdbcConfig setDefaultWarehouseDir(String defaultWarehouseDir) + { + this.defaultWarehouseDir = defaultWarehouseDir; + return this; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java new file mode 100644 index 000000000000..b59ce22f3740 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import org.jdbi.v3.core.ConnectionFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class IcebergJdbcConnectionFactory + implements ConnectionFactory +{ + private final String connectionUrl; + + public IcebergJdbcConnectionFactory(String connectionUrl) + { + this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null"); + } + + @Override + public Connection openConnection() + throws SQLException + { + Connection connection = DriverManager.getConnection(connectionUrl); + checkState(connection != null, "Driver returned null connection, make sure the connection URL is valid"); + return connection; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java new file mode 100644 index 000000000000..1b7e3bda1afa --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class IcebergJdbcTableOperations + extends AbstractIcebergTableOperations +{ + private final IcebergJdbcClient jdbcClient; + + public IcebergJdbcTableOperations( + FileIO fileIo, + IcebergJdbcClient jdbcClient, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + } + + @Override + protected String getRefreshedLocation(boolean invalidateCaches) + { + return jdbcClient.getMetadataLocation(database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + verify(version.isEmpty(), "commitNewTable called on a table which already exists"); + String newMetadataLocation = writeNewMetadata(metadata, 0); + jdbcClient.createTable(database, tableName, newMetadataLocation); + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); + jdbcClient.alterTable(database, tableName, newMetadataLocation, currentMetadataLocation); + shouldRefresh = true; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperationsProvider.java new file mode 100644 index 000000000000..3768191e3ae7 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperationsProvider.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorSession; + +import javax.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class IcebergJdbcTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergJdbcClient jdbcClient; + + @Inject + public IcebergJdbcTableOperationsProvider(IcebergJdbcClient jdbcClient, TrinoFileSystemFactory fileSystemFactory) + { + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new IcebergJdbcTableOperations( + fileSystemFactory.create(session).toFileIo(), + jdbcClient, + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java new file mode 100644 index 000000000000..f0009c12a88c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -0,0 +1,354 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TypeManager; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.jdbc.JdbcCatalog; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Maps.transformValues; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogUtil.dropTableData; + +public class TrinoJdbcCatalog + extends AbstractTrinoCatalog +{ + private final JdbcCatalog jdbcCatalog; + private final TrinoFileSystemFactory fileSystemFactory; + private final String defaultWarehouseDir; + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoJdbcCatalog( + CatalogName catalogName, + TypeManager typeManager, + IcebergTableOperationsProvider tableOperationsProvider, + JdbcCatalog jdbcCatalog, + TrinoFileSystemFactory fileSystemFactory, + boolean useUniqueTableLocation, + String defaultWarehouseDir) + { + super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation); + this.jdbcCatalog = requireNonNull(jdbcCatalog, "jdbcCatalog is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.defaultWarehouseDir = requireNonNull(defaultWarehouseDir, "defaultWarehouseDir is null"); + } + + @Override + public boolean namespaceExists(ConnectorSession session, String namespace) + { + return jdbcCatalog.namespaceExists(Namespace.of(namespace)); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + return jdbcCatalog.listNamespaces().stream() + .map(namespace -> namespace.level(0).toLowerCase(ENGLISH)) + .collect(toImmutableList()); + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + return jdbcCatalog.loadNamespaceMetadata(Namespace.of(namespace)).entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + try { + jdbcCatalog.createNamespace(Namespace.of(namespace), transformValues(properties, String.class::cast)); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to create a namespace " + namespace, e); + } + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + jdbcCatalog.dropNamespace(Namespace.of(namespace)); + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + List namespaces = listNamespaces(session, namespace); + // Build as a set and convert to list for removing duplicate entries due to case difference + ImmutableSet.Builder tablesListBuilder = ImmutableSet.builder(); + for (String schemaName : namespaces) { + try { + jdbcCatalog.listTables(Namespace.of(schemaName)).forEach(table -> tablesListBuilder.add(new SchemaTableName(schemaName, table.name()))); + } + catch (NoSuchNamespaceException e) { + // Namespace may have been deleted + } + } + return tablesListBuilder.build().asList(); + } + + private List listNamespaces(ConnectorSession session, Optional namespace) + { + if (namespace.isPresent() && namespaceExists(session, namespace.get())) { + if ("information_schema".equals(namespace.get())) { + // TODO https://github.com/trinodb/trino/issues/1559 this should be filtered out in engine. + return ImmutableList.of(); + } + return ImmutableList.of(namespace.get()); + } + return listNamespaces(session); + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, String location, Map properties) + { + if (!listNamespaces(session, Optional.of(schemaTableName.getSchemaName())).contains(schemaTableName.getSchemaName())) { + throw new SchemaNotFoundException(schemaTableName.getSchemaName()); + } + return newCreateTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation) + { + jdbcCatalog.registerTable(TableIdentifier.of(tableName.getSchemaName(), tableName.getTableName()), metadataLocation); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + BaseTable table = (BaseTable) loadTable(session, schemaTableName); + validateTableCanBeDropped(table); + + jdbcCatalog.dropTable(toIdentifier(schemaTableName), false); + dropTableData(table.io(), table.operations().current()); + deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + try { + jdbcCatalog.renameTable(toIdentifier(from), toIdentifier(to)); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to rename table from %s to %s".formatted(from, to), e); + } + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + schemaTableName, + ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + + return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); + } + + @Override + public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewComment is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg JDBC catalogs"); + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + Namespace namespace = Namespace.of(schemaTableName.getSchemaName()); + String tableName = createNewTableName(schemaTableName.getTableName()); + Optional databaseLocation; + if (!jdbcCatalog.namespaceExists(namespace)) { + databaseLocation = Optional.empty(); + } + else { + databaseLocation = Optional.ofNullable(jdbcCatalog.loadNamespaceMetadata(namespace).get(LOCATION_PROPERTY)); + } + + Path location; + if (databaseLocation.isEmpty()) { + location = new Path(new Path(defaultWarehouseDir, schemaTableName.getSchemaName()), tableName); + } + else { + location = new Path(databaseLocation.get(), tableName); + } + + return location.toString(); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public Optional redirectTable(ConnectorSession session, SchemaTableName tableName) + { + return Optional.empty(); + } + + private static TableIdentifier toIdentifier(SchemaTableName table) + { + return TableIdentifier.of(table.getSchemaName(), table.getTableName()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java new file mode 100644 index 000000000000..4d129b2b21e9 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.jdbc.JdbcCatalog; + +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.CatalogProperties.URI; +import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; + +public class TrinoJdbcCatalogFactory + implements TrinoCatalogFactory +{ + private final CatalogName catalogName; + private final TypeManager typeManager; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final TrinoFileSystemFactory fileSystemFactory; + private final String jdbcCatalogName; + private final String connectionUrl; + private final String defaultWarehouseDir; + private final boolean isUniqueTableLocation; + + @GuardedBy("this") + private JdbcCatalog icebergCatalog; + + @Inject + public TrinoJdbcCatalogFactory( + CatalogName catalogName, + TypeManager typeManager, + IcebergTableOperationsProvider tableOperationsProvider, + TrinoFileSystemFactory fileSystemFactory, + IcebergJdbcConfig jdbcConfig, + IcebergConfig icebergConfig) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation(); + this.jdbcCatalogName = jdbcConfig.getCatalogName(); + this.connectionUrl = jdbcConfig.getConnectionUrl(); + this.defaultWarehouseDir = jdbcConfig.getDefaultWarehouseDir(); + } + + @Override + public synchronized TrinoCatalog create(ConnectorIdentity identity) + { + // Reuse JdbcCatalog instance to avoid JDBC connection leaks + if (icebergCatalog == null) { + icebergCatalog = createJdbcCatalog(); + } + return new TrinoJdbcCatalog( + catalogName, + typeManager, + tableOperationsProvider, + icebergCatalog, + fileSystemFactory, + isUniqueTableLocation, + defaultWarehouseDir); + } + + private JdbcCatalog createJdbcCatalog() + { + JdbcCatalog jdbcCatalog = new JdbcCatalog(); + jdbcCatalog.initialize(jdbcCatalogName, ImmutableMap.builder() + .put(URI, connectionUrl) + .put(WAREHOUSE_LOCATION, defaultWarehouseDir) + .buildOrThrow()); + return jdbcCatalog; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index ce968791fe31..5f8a8c351d52 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -124,11 +124,15 @@ public DistributedQueryRunner build() queryRunner.installPlugin(new IcebergPlugin()); Map icebergProperties = new HashMap<>(this.icebergProperties.buildOrThrow()); - if (!icebergProperties.containsKey("iceberg.catalog.type")) { - Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); + String catalogType = icebergProperties.get("iceberg.catalog.type"); + Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); + if (catalogType == null) { icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE"); icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString()); } + if ("jdbc".equalsIgnoreCase(catalogType) && !icebergProperties.containsKey("iceberg.jdbc-catalog.default-warehouse-dir")) { + icebergProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", dataDir.toString()); + } queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 839f2600a073..94fd5886d1d9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -234,6 +234,22 @@ public void testRestCatalog() .shutdown(); } + @Test + public void testJdbcCatalog() + { + ConnectorFactory factory = getConnectorFactory(); + + factory.create( + "test", + Map.of( + "iceberg.catalog.type", "jdbc", + "iceberg.jdbc-catalog.connection-url", "jdbc:postgresql://localhost:5432/test", + "iceberg.jdbc-catalog.catalog-name", "test", + "iceberg.jdbc-catalog.default-warehouse-dir", "s3://bucket"), + new TestingConnectorContext()) + .shutdown(); + } + private static ConnectorFactory getConnectorFactory() { return getOnlyElement(new IcebergPlugin().getConnectorFactories()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergIcebergJdbcConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergIcebergJdbcConfig.java new file mode 100644 index 000000000000..16625625fc23 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergIcebergJdbcConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestIcebergIcebergJdbcConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(IcebergJdbcConfig.class) + .setConnectionUrl(null) + .setCatalogName(null) + .setDefaultWarehouseDir(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.jdbc-catalog.connection-url", "jdbc:postgresql://localhost:5432/test") + .put("iceberg.jdbc-catalog.catalog-name", "test") + .put("iceberg.jdbc-catalog.default-warehouse-dir", "s3://bucket") + .buildOrThrow(); + + IcebergJdbcConfig expected = new IcebergJdbcConfig() + .setConnectionUrl("jdbc:postgresql://localhost:5432/test") + .setCatalogName("test") + .setDefaultWarehouseDir("s3://bucket"); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..4e2b9986b459 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergJdbcCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + public TestIcebergJdbcCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @SuppressWarnings("DuplicateBranchesInSwitch") + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_RENAME_SCHEMA -> false; + case SUPPORTS_CREATE_VIEW, SUPPORTS_COMMENT_ON_VIEW, SUPPORTS_COMMENT_ON_VIEW_COLUMN -> false; + case SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_RENAME_MATERIALIZED_VIEW -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "jdbc") + .put("iceberg.jdbc-catalog.connection-url", server.getJdbcUrl()) + .put("iceberg.jdbc-catalog.catalog-name", "tpch") + .buildOrThrow()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessageContaining("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + // used when registering a table, which is not supported by the JDBC catalog + } + + @Override + protected String getMetadataLocation(String tableName) + { + // used when registering a table, which is not supported by the JDBC catalog + throw new UnsupportedOperationException("metadata location for register_table is not supported"); + } + + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("metadata location for register_table is not supported"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java new file mode 100644 index 000000000000..045a4df8523e --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -0,0 +1,261 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.iceberg.BaseIcebergConnectorTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.annotations.Test; + +import java.util.OptionalInt; + +import static io.trino.tpch.TpchTable.LINE_ITEM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergJdbcConnectorTest + extends BaseIcebergConnectorTest +{ + public TestIcebergJdbcConnectorTest() + { + super(new IcebergConfig().getFileFormat()); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "jdbc") + .put("iceberg.jdbc-catalog.connection-url", server.getJdbcUrl()) + .put("iceberg.jdbc-catalog.catalog-name", "tpch") + .buildOrThrow()) + .setInitialTables(ImmutableList.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .add(LINE_ITEM) + .build()) + .build(); + } + + @Override + public void testShowCreateSchema() + { + // Override because Iceberg JDBC catalog requires location in the namespace + assertThat(computeActual("SHOW CREATE SCHEMA tpch").getOnlyValue().toString()) + .matches(""" + CREATE SCHEMA iceberg.tpch + WITH \\( + location = '.*' + \\)"""); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessage("renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testRenameSchemaToLongName() + { + assertThatThrownBy(super::testRenameSchemaToLongName) + .hasMessage("renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCreateViewSchemaNotFound() + { + assertThatThrownBy(super::testCreateViewSchemaNotFound) + .hasMessageContaining("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testShowCreateView() + { + assertThatThrownBy(super::testShowCreateView) + .hasMessageContaining("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCompatibleTypeChangeForView() + { + assertThatThrownBy(super::testCompatibleTypeChangeForView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCompatibleTypeChangeForView2() + { + assertThatThrownBy(super::testCompatibleTypeChangeForView2) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testDropNonEmptySchemaWithView() + { + assertThatThrownBy(super::testDropNonEmptySchemaWithView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Test(dataProvider = "testViewMetadataDataProvider") + @Override + public void testViewMetadata(String securityClauseInCreate, String securityClauseInShowCreate) + { + assertThatThrownBy(() -> super.testViewMetadata(securityClauseInCreate, securityClauseInShowCreate)) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testViewCaseSensitivity() + { + assertThatThrownBy(super::testViewCaseSensitivity) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testViewAndMaterializedViewTogether() + { + assertThatThrownBy(super::testViewAndMaterializedViewTogether) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCommentView() + { + assertThatThrownBy(super::testCommentView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCommentViewColumn() + { + assertThatThrownBy(super::testCommentViewColumn) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + assertThatThrownBy(super::testReadMetadataWithRelationsConcurrentModifications) + .hasMessageMatching(".* (createView|createMaterializedView) is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Test(dataProvider = "testColumnNameDataProvider") + @Override + public void testMaterializedViewColumnName(String columnName) + { + assertThatThrownBy(() -> super.testMaterializedViewColumnName(columnName)) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testDropNonEmptySchemaWithMaterializedView() + { + assertThatThrownBy(super::testDropNonEmptySchemaWithMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testRenameMaterializedView() + { + assertThatThrownBy(super::testRenameMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testFederatedMaterializedView() + { + assertThatThrownBy(super::testFederatedMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testMaterializedViewSnapshotSummariesHaveTrinoQueryId() + { + assertThatThrownBy(super::testMaterializedViewSnapshotSummariesHaveTrinoQueryId) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + protected void verifyConcurrentAddColumnFailurePermissible(Exception e) + { + assertThat(e) + .hasMessageStartingWith("Failed to add column: Failed to update table due to concurrent updates"); + } + + @Override + protected boolean supportsIcebergFileStatistics(String typeName) + { + return !(typeName.equalsIgnoreCase("varbinary")) && + !(typeName.equalsIgnoreCase("uuid")); + } + + @Override + protected boolean supportsRowGroupStatistics(String typeName) + { + return !typeName.equalsIgnoreCase("varbinary"); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "10") + .build(); + } + + @Override + protected OptionalInt maxSchemaNameLength() + { + return OptionalInt.of(255); + } + + @Override + protected void verifySchemaNameLengthFailurePermissible(Throwable e) + { + assertThat(e) + .hasMessageContaining("Failed to create a namespace") + .hasStackTraceContaining("ERROR: value too long for type character varying(255)"); + } + + @Override + protected void verifyTableNameLengthFailurePermissible(Throwable e) + { + assertThat(e).hasMessageMatching("Failed to create file.*|.*Failed to rename table.*"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java new file mode 100644 index 000000000000..e5c39a3c39b1 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import org.apache.iceberg.jdbc.TestingTrinoIcebergJdbcUtil; +import org.intellij.lang.annotations.Language; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.io.Closeable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import static java.lang.String.format; +import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; + +public class TestingIcebergJdbcServer + implements Closeable +{ + private static final String USER = "test"; + private static final String PASSWORD = "test"; + private static final String DATABASE = "tpch"; + + private final PostgreSQLContainer dockerContainer; + + public TestingIcebergJdbcServer() + { + // TODO: Use Iceberg docker image once the community provides it + dockerContainer = new PostgreSQLContainer<>("postgres:12.10") + .withDatabaseName(DATABASE) + .withUsername(USER) + .withPassword(PASSWORD); + dockerContainer.start(); + + execute(TestingTrinoIcebergJdbcUtil.CREATE_CATALOG_TABLE); + execute(TestingTrinoIcebergJdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE); + + execute("CREATE SCHEMA tpch"); + } + + public void execute(@Language("SQL") String sql) + { + try (Connection connection = DriverManager.getConnection(getJdbcUrl()); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public String getJdbcUrl() + { + return format( + "jdbc:postgresql://%s:%s/%s?user=%s&password=%s¤tSchema=tpch,public", + dockerContainer.getHost(), + dockerContainer.getMappedPort(POSTGRESQL_PORT), + DATABASE, + USER, + PASSWORD); + } + + @Override + public void close() + { + dockerContainer.close(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/jdbc/TestingTrinoIcebergJdbcUtil.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/jdbc/TestingTrinoIcebergJdbcUtil.java new file mode 100644 index 000000000000..f15d86b24b66 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/jdbc/TestingTrinoIcebergJdbcUtil.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.jdbc; + +public final class TestingTrinoIcebergJdbcUtil +{ + public static final String CREATE_CATALOG_TABLE = JdbcUtil.CREATE_CATALOG_TABLE; + public static final String CREATE_NAMESPACE_PROPERTIES_TABLE = JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE; + + private TestingTrinoIcebergJdbcUtil() {} +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java new file mode 100644 index 000000000000..323aa02a30a3 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; +import io.trino.tests.product.launcher.testcontainers.PortBinder; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D; +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TEMPTO_PROFILE_CONFIG; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvSinglenodeSparkIcebergJdbcCatalog + extends EnvironmentProvider +{ + private static final int SPARK_THRIFT_PORT = 10213; + // Use non-default PostgreSQL port to avoid conflicts with locally installed PostgreSQL if any. + public static final int POSTGRESQL_PORT = 25432; + + private final DockerFiles dockerFiles; + private final PortBinder portBinder; + private final String imagesVersion; + private final String hadoopImagesVersion; + + @Inject + public EnvSinglenodeSparkIcebergJdbcCatalog(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder) + { + super(ImmutableList.of(standard, hadoop)); + this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); + this.portBinder = requireNonNull(portBinder, "portBinder is null"); + this.imagesVersion = requireNonNull(config, "config is null").getImagesVersion(); + this.hadoopImagesVersion = requireNonNull(config, "config is null").getHadoopImagesVersion(); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + String dockerImageName = "ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion; + + builder.configureContainer(HADOOP, container -> { + container.setDockerImageName(dockerImageName); + container.withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg/apply-hive-config-for-iceberg.sh")), + CONTAINER_HADOOP_INIT_D + "/apply-hive-config-for-iceberg.sh"); + }); + + builder.configureContainer(TESTS, dockerContainer -> { + dockerContainer.withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/tempto/tempto-configuration-for-hive3.yaml")), + CONTAINER_TEMPTO_PROFILE_CONFIG); + }); + + builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties"))); + + builder.addContainer(createPostgreSql()); + + builder.addContainer(createSpark()) + .containerDependsOn("spark", HADOOP); + } + + @SuppressWarnings("resource") + public DockerContainer createPostgreSql() + { + DockerContainer container = new DockerContainer("postgres:14.2", "postgresql") + .withEnv("POSTGRES_PASSWORD", "test") + .withEnv("POSTGRES_USER", "test") + .withEnv("POSTGRES_DB", "test") + .withEnv("PGPORT", Integer.toString(POSTGRESQL_PORT)) + .withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql")), + "/docker-entrypoint-initdb.d/create-table.sql") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(POSTGRESQL_PORT)); + + portBinder.exposePort(container, POSTGRESQL_PORT); + + return container; + } + + @SuppressWarnings("resource") + private DockerContainer createSpark() + { + try { + DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + imagesVersion, "spark") + .withEnv("HADOOP_USER_NAME", "hive") + .withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf")), + "/spark/conf/spark-defaults.conf") + .withCommand( + "spark-submit", + "--master", "local[*]", + "--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + "--name", "Thrift JDBC/ODBC Server", + "--packages", "org.apache.spark:spark-avro_2.12:3.2.1", + "--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT, + "spark-internal") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(SPARK_THRIFT_PORT)); + + portBinder.exposePort(container, SPARK_THRIFT_PORT); + + return container; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java index 9037cefcb2d7..ed1331a67660 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java @@ -18,6 +18,7 @@ import io.trino.tests.product.launcher.env.EnvironmentDefaults; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHiveIcebergRedirections; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIceberg; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergJdbcCatalog; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergRest; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -45,6 +46,9 @@ public List getTestRuns(EnvironmentConfig config) .build(), testOnEnvironment(EnvSinglenodeSparkIcebergRest.class) .withGroups("configured_features", "iceberg_rest") + .build(), + testOnEnvironment(EnvSinglenodeSparkIcebergJdbcCatalog.class) + .withGroups("configured_features", "iceberg_jdbc") .build()); } } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh new file mode 100755 index 000000000000..2dc1e54559ea --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -exuo pipefail + +echo "Applying hive-site configuration overrides for Spark" +apply-site-xml-override /etc/hive/conf/hive-site.xml "/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml" diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql new file mode 100644 index 000000000000..9b22d0a3825b --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql @@ -0,0 +1,20 @@ +/** + Table definition in Iceberg repository: + https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java + */ +CREATE TABLE iceberg_namespace_properties ( + catalog_name VARCHAR(255) NOT NULL, + namespace VARCHAR(255) NOT NULL, + property_key VARCHAR(5500), + property_value VARCHAR(5500), + PRIMARY KEY (catalog_name, namespace, property_key) +); + +CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(5500), + previous_metadata_location VARCHAR(5500), + PRIMARY KEY (catalog_name, table_namespace, table_name) +); diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml new file mode 100644 index 000000000000..b1411b3ab8a9 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml @@ -0,0 +1,9 @@ + + + + + hive.metastore.disallow.incompatible.col.type.changes + false + + + diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties new file mode 100644 index 000000000000..b4ec149f6e6d --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties @@ -0,0 +1,6 @@ +connector.name=iceberg +iceberg.catalog.type=jdbc +iceberg.jdbc-catalog.connection-url=jdbc:postgresql://postgresql:25432/test?user=test&password=test +iceberg.jdbc-catalog.catalog-name=iceberg_test +iceberg.jdbc-catalog.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse +hive.hdfs.socks-proxy=hadoop-master:1180 diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf new file mode 100644 index 000000000000..b454045ee011 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf @@ -0,0 +1,13 @@ +; disabling caching allows us to run spark queries interchangeably with trino's +spark.sql.catalog.iceberg_test.cache-enabled=false +spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.hive.thriftServer.singleSession=false + +spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.iceberg_test.warehouse=hdfs://hadoop-master:9000/user/hive/warehouse +spark.sql.catalog.iceberg_test.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog +spark.sql.catalog.iceberg_test.uri=jdbc:postgresql://postgresql:25432/test +spark.sql.catalog.iceberg_test.jdbc.user=test +spark.sql.catalog.iceberg_test.jdbc.password=test + +spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000 diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/tpch.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/tpch.properties new file mode 100644 index 000000000000..75110c5acf14 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/tpch.properties @@ -0,0 +1 @@ +connector.name=tpch diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 13b7a7aef75f..9e3582d6292d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -71,6 +71,7 @@ public final class TestGroups public static final String ICEBERG = "iceberg"; public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility"; public static final String ICEBERG_REST = "iceberg_rest"; + public static final String ICEBERG_JDBC = "iceberg_jdbc"; public static final String AVRO = "avro"; public static final String PHOENIX = "phoenix"; public static final String CLICKHOUSE = "clickhouse"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index f821dbe0b966..597299624713 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -64,12 +64,12 @@ import static io.trino.testing.DataProviders.toDataProvider; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.ICEBERG_JDBC; import static io.trino.tests.product.TestGroups.ICEBERG_REST; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AND_INSERT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AS_SELECT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_WITH_NO_DATA_AND_INSERT; -import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.StorageFormat.AVRO; import static io.trino.tests.product.iceberg.util.IcebergTestUtils.getTableLocation; import static io.trino.tests.product.iceberg.util.IcebergTestUtils.stripNamenodeURI; import static io.trino.tests.product.utils.QueryExecutors.onSpark; @@ -100,6 +100,8 @@ public void setup() throws TException { metastoreClient = testHiveMetastoreClientFactory.createMetastoreClient(); + // Create 'default' schema if it doesn't exist because JDBC catalog doesn't have such schema + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS iceberg.default WITH (location = 'hdfs://hadoop-master:9000/user/hive/warehouse/default')"); } @AfterTestWithContext @@ -121,7 +123,7 @@ public void setUp() onTrino().executeQuery(format("CREATE SCHEMA IF NOT EXISTS %s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reading_primitive_types_" + storageFormat); @@ -218,13 +220,15 @@ public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersi onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "testSparkReadingTrinoDataDataProvider") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "testSparkReadingTrinoDataDataProvider") public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode createMode) { String baseTableName = toLowerCase("test_spark_reading_primitive_types_" + storageFormat + "_" + createMode); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + String namedValues = "SELECT " + " VARCHAR 'a_string' _string " + ", 1000000000000000 _bigint " + @@ -344,13 +348,15 @@ public Object[][] testSparkReadingTrinoDataDataProvider() .toArray(Object[][]::new); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadTrinoUuid(StorageFormat storageFormat) { String tableName = toLowerCase("test_spark_read_trino_uuid_" + storageFormat); String trinoTableName = trinoTableName(tableName); String sparkTableName = sparkTableName(tableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + onTrino().executeQuery(format( "CREATE TABLE %s AS SELECT UUID '406caec7-68b9-4778-81b2-a12ece70c8b1' u", trinoTableName)); @@ -359,12 +365,12 @@ public void testSparkReadTrinoUuid(StorageFormat storageFormat) // TODO Iceberg Spark integration needs yet to gain support // once this is supported, merge this test with testSparkReadingTrinoData() .isInstanceOf(SQLException.class) - .hasMessageMatching("org.apache.hive.service.cli.HiveSQLException: Error running query:.*\\Q java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.unsafe.types.UTF8String\\E(?s:.*)"); + .hasMessageMatching("org.apache.hive.service.cli.HiveSQLException: Error running query:.*\\Q java.lang.ClassCastException\\E(?s:.*)"); onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "specVersions") public void testSparkCreatesTrinoDrops(int specVersion) { String baseTableName = "test_spark_creates_trino_drops"; @@ -372,7 +378,7 @@ public void testSparkCreatesTrinoDrops(int specVersion) onTrino().executeQuery("DROP TABLE " + trinoTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testTrinoCreatesSparkDrops() { String baseTableName = "test_trino_creates_spark_drops"; @@ -380,7 +386,7 @@ public void testTrinoCreatesSparkDrops() onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) { String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_" + storageFormat); @@ -408,7 +414,7 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reads_spark_partitioned_table_" + storageFormat); @@ -527,7 +533,7 @@ public void testPartitionedByNestedFiled() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reading_spark_composites_" + storageFormat); @@ -555,7 +561,7 @@ public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) { String baseTableName = toLowerCase("test_spark_reading_trino_composites_" + storageFormat); @@ -582,7 +588,7 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat); @@ -610,7 +616,7 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reading_nested_spark_data_" + storageFormat); @@ -667,7 +673,7 @@ public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int spe onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) { String baseTableName = toLowerCase("test_spark_reading_nested_trino_data_" + storageFormat); @@ -724,7 +730,7 @@ public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_schema_evolution_for_nested_fields_" + storageFormat); @@ -842,7 +848,7 @@ public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_read_after_partition_evolution_" + storageFormat); @@ -920,23 +926,26 @@ public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int spe onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testTrinoShowingSparkCreatedTables(int specVersion) { String sparkTable = "test_table_listing_for_spark"; String trinoTable = "test_table_listing_for_trino"; + onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTable); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTable); + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG TBLPROPERTIES('format-version' = %s)", sparkTableName(sparkTable), specVersion)); onTrino().executeQuery(format("CREATE TABLE %s (_integer INTEGER )", trinoTableName(trinoTable))); - assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s LIKE '%s'", TEST_SCHEMA_NAME, "test_table_listing_for_%"))) + assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s.%s LIKE '%s'", TRINO_CATALOG, TEST_SCHEMA_NAME, "test_table_listing_for_%"))) .containsOnly(row(sparkTable), row(trinoTable)); onSpark().executeQuery("DROP TABLE " + sparkTableName(sparkTable)); onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "specVersions") public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) { String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; @@ -955,7 +964,7 @@ public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) onSpark().executeQuery(format("DROP TABLE %s", sparkTableName(tableSameLocation2))); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) { String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; @@ -975,7 +984,7 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) // Can't clean up tableSameLocation2 as all data and metadata has been removed } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_object_storage_location_provider_" + storageFormat); @@ -1005,7 +1014,7 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_writer_data_path_" + storageFormat); @@ -1072,7 +1081,7 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), ((value, index) -> row((int) index, value))) .collect(toImmutableList()); - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testStringPartitioningWithSpecialCharactersCtasInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -1090,7 +1099,7 @@ public void testStringPartitioningWithSpecialCharactersCtasInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testStringPartitioningWithSpecialCharactersInsertInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -1106,7 +1115,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testStringPartitioningWithSpecialCharactersInsertInSpark() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_spark"; @@ -1122,7 +1131,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInSpark() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testPartitioningWithMixedCaseColumnUnsupportedInTrino() { String baseTableName = "test_partitioning_with_mixed_case_column_in_spark"; @@ -1140,7 +1149,7 @@ public void testPartitioningWithMixedCaseColumnUnsupportedInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testInsertReadingFromParquetTableWithNestedRowFieldNotPresentInDataFile() { // regression test for https://github.com/trinodb/trino/issues/9264 @@ -1224,7 +1233,7 @@ private void assertSelectsOnSpecialCharacters(String trinoTableName, String spar /** * @see TestIcebergInsert#testIcebergConcurrentInsert() */ - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, timeOut = 60_000) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, timeOut = 60_000) public void testTrinoSparkConcurrentInsert() throws Exception { @@ -1291,7 +1300,7 @@ public void testTrinoSparkConcurrentInsert() } } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsAndCompressionCodecs") public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, String compressionCodec) { String baseTableName = toLowerCase("test_spark_compression" + @@ -1356,7 +1365,7 @@ else if ("ZSTD".equals(compressionCodec)) { onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsAndCompressionCodecs") public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, String compressionCodec) { String baseTableName = toLowerCase("test_trino_compression" + @@ -1375,7 +1384,7 @@ public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, Str .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Unsupported codec: LZ4"); return; } - if (storageFormat == AVRO && (compressionCodec.equals("LZ4"))) { + if (storageFormat == StorageFormat.AVRO && (compressionCodec.equals("LZ4"))) { assertQueryFailure(() -> onTrino().executeQuery(createTable)) .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Unsupported compression codec: " + compressionCodec); return; @@ -1393,7 +1402,7 @@ public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, Str onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void verifyCompressionCodecsDataProvider() { assertThat(onTrino().executeQuery("SHOW SESSION LIKE 'iceberg.compression_codec'")) @@ -1425,7 +1434,7 @@ private List compressionCodecs() "GZIP"); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) { String baseTableName = "test_trino_reading_migrated_nested_data_" + randomNameSuffix(); @@ -1437,9 +1446,8 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) ", nested_map MAP>>\n" + ", nested_array ARRAY>>>\n" + ", nested_struct STRUCT>)\n" + - " USING %s\n" + - " OPTIONS ('compression'='snappy')"; - onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + " USING %s"; + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name().toLowerCase(ENGLISH))); String insert = "" + "INSERT INTO TABLE %s SELECT" + @@ -1449,7 +1457,14 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) " ,map('m2', array(named_struct('mName', 'MAS2Name1', 'mNumber', 401), named_struct('mName', 'MAS2Name2', 'mNumber', 402))))" + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('street_number', 42, 'street_name', 'Wallaby Way'))"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); Row row = row("Doc213", "ASName2", 201, "MAS2Name1", 302, "P. Sherman", 42, "Wallaby Way"); @@ -1491,7 +1506,7 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) assertThat(onTrino().executeQuery("SELECT nested_struct.address.street_number, nested_struct.address.street_name FROM " + trinoTableName)).containsOnly(row(null, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) { String baseTableName = "test_migrated_data_with_altered_schema_" + randomNameSuffix(); @@ -1501,16 +1516,22 @@ public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) "CREATE TABLE %s (\n" + " doc_id STRING\n" + ", nested_struct STRUCT>)\n" + - " USING %s\n" + - " OPTIONS ('compression'='snappy')"; - onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + " USING %s"; + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat)); String insert = "" + "INSERT INTO TABLE %s SELECT" + " 'Doc213'" + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('a', 42, 'b', 'Wallaby Way'))"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); onSpark().executeQuery("ALTER TABLE " + sparkTableName + " RENAME COLUMN nested_struct TO nested_struct_moved"); @@ -1533,18 +1554,25 @@ public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) assertThat(onTrino().executeQuery(select + trinoTableName)).containsOnly(row(null, null, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) { String baseTableName = "test_migrated_data_with_partial_name_mapping_" + randomNameSuffix(); String defaultCatalogTableName = sparkDefaultCatalogTableName(baseTableName); - String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name() + " OPTIONS ('compression'='snappy')"; + String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name().toLowerCase(ENGLISH); onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName)); String insert = "INSERT INTO TABLE %s SELECT 1, 2"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); @@ -1556,7 +1584,7 @@ public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) .containsOnly(row(1, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testPartialStats() { String tableName = "test_partial_stats_" + randomNameSuffix(); @@ -1586,7 +1614,7 @@ public void testPartialStats() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testStatsAfterAddingPartitionField() { String tableName = "test_stats_after_adding_partition_field_" + randomNameSuffix(); @@ -1612,7 +1640,7 @@ public void testStatsAfterAddingPartitionField() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { String tableName = toLowerCase(format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix())); @@ -1645,7 +1673,7 @@ public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { String tableName = toLowerCase(format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix())); @@ -1670,7 +1698,7 @@ public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableSt onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) { String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomNameSuffix())); @@ -1697,7 +1725,7 @@ public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat) { String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomNameSuffix())); @@ -1716,7 +1744,7 @@ public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storage onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) { String baseTableName = toLowerCase("test_delete_after_partition_evolution_" + storageFormat + randomNameSuffix()); @@ -1783,7 +1811,7 @@ public void testMissingMetrics() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testOptimizeOnV2IcebergTable() { String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomNameSuffix()); @@ -1899,10 +1927,10 @@ public enum CreateMode { CREATE_TABLE_AND_INSERT, CREATE_TABLE_AS_SELECT, - CREATE_TABLE_WITH_NO_DATA_AND_INSERT, + CREATE_TABLE_WITH_NO_DATA_AND_INSERT } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) { String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat); @@ -1942,7 +1970,7 @@ public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat); @@ -1986,7 +2014,7 @@ public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat sto onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat); @@ -2018,7 +2046,7 @@ public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(Stor onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "tableFormatWithDeleteFormat") public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { String baseTableName = toLowerCase("test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat); @@ -2055,7 +2083,7 @@ public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableSto .containsOnly(row); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testUpdateAfterSchemaEvolution() { String baseTableName = "test_update_after_schema_evolution_" + randomNameSuffix(); @@ -2097,7 +2125,7 @@ public void testUpdateAfterSchemaEvolution() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testUpdateOnPartitionColumn() { String baseTableName = "test_update_on_partition_column" + randomNameSuffix(); @@ -2130,7 +2158,7 @@ public void testUpdateOnPartitionColumn() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testAddNotNullColumn() { String baseTableName = "test_add_not_null_column_" + randomNameSuffix(); @@ -2149,7 +2177,7 @@ public void testAddNotNullColumn() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}) public void testHandlingPartitionSchemaEvolutionInPartitionMetadata() { String baseTableName = "test_handling_partition_schema_evolution_" + randomNameSuffix();