From a14c77132a5159553926d86fc2028034cb4f6b94 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 4 Apr 2022 11:27:36 +0900 Subject: [PATCH] Support JDBC catalog in Iceberg connector --- docs/src/main/sphinx/connector/iceberg.rst | 25 +- plugin/trino-iceberg/pom.xml | 23 ++ .../io/trino/plugin/iceberg/CatalogType.java | 1 + .../iceberg/catalog/IcebergCatalogModule.java | 3 + .../jdbc/IcebergJdbcCatalogModule.java | 52 ++++ .../catalog/jdbc/JdbcIcebergClient.java | 246 ++++++++++++++++ .../catalog/jdbc/JdbcIcebergConfig.java | 98 ++++++ .../jdbc/JdbcIcebergConnectionFactory.java | 51 ++++ .../jdbc/JdbcIcebergTableOperations.java | 69 +++++ .../JdbcIcebergTableOperationsProvider.java | 60 ++++ .../catalog/jdbc/TrinoJdbcCatalog.java | 278 ++++++++++++++++++ .../catalog/jdbc/TrinoJdbcCatalogFactory.java | 73 +++++ .../plugin/iceberg/IcebergQueryRunner.java | 8 +- .../plugin/iceberg/TestIcebergPlugin.java | 18 ++ .../jdbc/TestIcebergJdbcConnectorTest.java | 206 +++++++++++++ .../jdbc/TestIcebergJdbcIcebergConfig.java | 58 ++++ .../jdbc/TestingIcebergJdbcServer.java | 97 ++++++ .../src/test/resources/jdbc/create-table.sql | 20 ++ .../EnvSinglenodeSparkIcebergJdbcCatalog.java | 133 +++++++++ .../suite/suites/Suite7NonGeneric.java | 4 + .../apply-hive-config-for-iceberg.sh | 5 + .../create-table.sql | 20 ++ .../hive-site-overrides.xml | 9 + .../iceberg.properties | 8 + .../spark-defaults.conf | 13 + .../io/trino/tests/product/TestGroups.java | 1 + .../TestIcebergSparkCompatibility.java | 205 +++++++------ 27 files changed, 1696 insertions(+), 88 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergClient.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConfig.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConnectionFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcIcebergConfig.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java create mode 100644 plugin/trino-iceberg/src/test/resources/jdbc/create-table.sql create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java create mode 100755 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 547f6ad8a3c6..8ebbafa427fe 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -45,10 +45,10 @@ To use Iceberg, you need: Configuration ------------- -The connector supports two Iceberg catalog types, you may use either a Hive -metastore service (HMS) or AWS Glue. The catalog type is determined by the +The connector supports several Iceberg catalog types, you may use either a Hive +metastore service (HMS), AWS Glue or JDBC. The catalog type is determined by the ``iceberg.catalog.type`` property, it can be set to either ``HIVE_METASTORE`` -or ``GLUE``. +, ``GLUE`` or ``JDBC``. Hive metastore catalog ^^^^^^^^^^^^^^^^^^^^^^ @@ -77,6 +77,25 @@ configuration properties as the Hive connector's Glue setup. See iceberg.catalog.type=glue +JDBC catalog +^^^^^^^^^^^^ + +At a minimum, ``iceberg.metastore.jdbc.connection-url`` and +``iceberg.metastore.jdbc.catalogid`` must be configured. +A jar file of JDBC driver needs to be located in the plugin directory +when using it. + +.. code-block:: text + + connector.name=iceberg + iceberg.catalog.type=jdbc + iceberg.metastore.jdbc.catalogid=test + iceberg.metastore.jdbc.connection-url=jdbc:postgresql://example.net:5432/database + iceberg.metastore.jdbc.connection-user=root + iceberg.metastore.jdbc.connection-password=secret + iceberg.metastore.jdbc.default-warehouse-dir=s3://bucket + + General configuration ^^^^^^^^^^^^^^^^^^^^^ diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 7b59d3e29869..6fcb4773fef5 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -197,6 +197,11 @@ + + org.jdbi + jdbi3-core + + org.weakref jmxutils @@ -247,6 +252,12 @@ runtime + + org.postgresql + postgresql + runtime + + io.trino @@ -354,6 +365,18 @@ test + + org.testcontainers + postgresql + test + + + + org.testcontainers + testcontainers + test + + org.testng testng diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java index 35f46922a785..b7d71f9200fe 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -18,5 +18,6 @@ public enum CatalogType TESTING_FILE_METASTORE, HIVE_METASTORE, GLUE, + JDBC, /**/; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 9a9dbf795bd2..05c9b699d2b2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -30,12 +30,14 @@ import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; +import io.trino.plugin.iceberg.catalog.jdbc.IcebergJdbcCatalogModule; import java.util.Optional; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.CatalogType.JDBC; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static java.util.Objects.requireNonNull; @@ -63,6 +65,7 @@ protected void setup(Binder binder) bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); + bindCatalogModule(JDBC, new IcebergJdbcCatalogModule()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java new file mode 100644 index 000000000000..3686c3e06e36 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class IcebergJdbcCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(JdbcIcebergConfig.class); + binder.bind(TrinoJdbcCatalogFactory.class); + binder.bind(IcebergTableOperationsProvider.class).to(JdbcIcebergTableOperationsProvider.class).in(Scopes.SINGLETON); + newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName(); + binder.bind(TrinoCatalogFactory.class).to(TrinoJdbcCatalogFactory.class).in(Scopes.SINGLETON); + newExporter(binder).export(TrinoJdbcCatalogFactory.class).withGeneratedName(); + } + + @Provides + @Singleton + public static JdbcIcebergClient createJdbcIcebergClient(JdbcIcebergConfig config) + { + return new JdbcIcebergClient( + new JdbcIcebergConnectionFactory( + config.getConnectionUrl(), + config.getConnectionUser(), + config.getConnectionPassword()), + config.getCatalogId()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergClient.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergClient.java new file mode 100644 index 000000000000..d90842cd23bf --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergClient.java @@ -0,0 +1,246 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.connector.SchemaTableName; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.util.Objects.requireNonNull; + +public class JdbcIcebergClient +{ + private final JdbcIcebergConnectionFactory connectionFactory; + private final String catalogId; + + public JdbcIcebergClient(JdbcIcebergConnectionFactory connectionFactory, String catalogId) + { + this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory is null"); + this.catalogId = requireNonNull(catalogId, "catalogId is null"); + } + + public List getNamespaces() + { + ImmutableList.Builder namespaces = ImmutableList.builder(); + try (Handle handle = Jdbi.open(connectionFactory)) { + namespaces.addAll(handle.createQuery("" + + "SELECT DISTINCT table_namespace " + + "FROM iceberg_tables " + + "WHERE catalog_name = :catalog") + .bind("catalog", catalogId) + .mapTo(String.class) + .list()); + namespaces.addAll(handle.createQuery("" + + "SELECT DISTINCT namespace " + + "FROM iceberg_namespace_properties " + + "WHERE catalog_name = :catalog") + .bind("catalog", catalogId) + .mapTo(String.class) + .list()); + } + return namespaces.build(); + } + + public Map getNamespaceProperties(String namespace) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + return handle.createQuery("" + + "SELECT key, value " + + "FROM iceberg_namespace_properties " + + "WHERE catalog_name = :catalog AND namespace = :schema") + .bind("catalog", catalogId) + .bind("schema", namespace) + .map((rs, ctx) -> new SimpleEntry<>(rs.getString("key"), rs.getString("value"))) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + } + + public Optional getNamespaceLocation(String namespace) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + return handle.createQuery("" + + "SELECT value " + + "FROM iceberg_namespace_properties " + + "WHERE catalog_name = :catalog AND namespace = :schema AND key = 'location'") + .bind("catalog", catalogId) + .bind("schema", namespace) + .mapTo(String.class) + .findOne(); + } + } + + public void createNamespace(String namespace, Map properties) + { + ImmutableList.Builder namespaceProperties = ImmutableList.builderWithExpectedSize(properties.size() + 1); + namespaceProperties.add(new NamespaceProperties(catalogId, namespace, "exists", "true")); + for (Entry property : properties.entrySet()) { + namespaceProperties.add(new NamespaceProperties(catalogId, namespace, property.getKey(), property.getValue().toString())); + } + + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "INSERT INTO iceberg_namespace_properties " + + "(catalog_name, namespace, key, value) " + + "VALUES ") + .bindBeanList("values", namespaceProperties.build(), ImmutableList.of("catalogName", "namespace", "key", "value")) + .execute(); + } + } + + public void dropNamespace(String namespace) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "DELETE FROM iceberg_namespace_properties " + + "WHERE catalog_name = :catalog AND namespace = :schema") + .bind("catalog", catalogId) + .bind("schema", namespace) + .execute(); + } + } + + public List getTables(List namespaces) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + return handle.createQuery("" + + "SELECT table_namespace, table_name " + + "FROM iceberg_tables " + + "WHERE catalog_name = :catalog AND table_namespace IN ()") + .bind("catalog", catalogId) + .bindList("namespaces", namespaces) + .map((rs, ctx) -> new SchemaTableName(rs.getString("table_namespace"), rs.getString("table_name"))) + .list(); + } + } + + public void createTable(String schemaName, String tableName, String metadataLocation) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "INSERT INTO iceberg_tables " + + "(catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location) " + + "VALUES (:catalog, :schema, :table, :metadata_location, null)") + .bind("catalog", catalogId) + .bind("schema", schemaName) + .bind("table", tableName) + .bind("metadata_location", metadataLocation) + .execute(); + } + } + + public void alterTable(String schemaName, String tableName, String newMetadataLocation, String previousMetadataLocation) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "UPDATE iceberg_tables " + + "SET metadata_location = :metadata_location, previous_metadata_location = :previous_metadata_location " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table") + .bind("metadata_location", newMetadataLocation) + .bind("previous_metadata_location", previousMetadataLocation) + .bind("catalog", catalogId) + .bind("schema", schemaName) + .bind("table", tableName) + .execute(); + } + } + + public Optional getMetadataLocation(String schemaName, String tableName) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + return handle.createQuery("" + + "SELECT metadata_location " + + "FROM iceberg_tables " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table") + .bind("catalog", catalogId) + .bind("schema", schemaName) + .bind("table", tableName) + .mapTo(String.class) + .findOne(); + } + } + + public void dropTable(String schemaName, String tableName) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "DELETE FROM iceberg_tables " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table") + .bind("catalog", catalogId) + .bind("schema", schemaName) + .bind("table", tableName) + .execute(); + } + } + + public void renameTable(SchemaTableName from, SchemaTableName to) + { + try (Handle handle = Jdbi.open(connectionFactory)) { + handle.createUpdate("" + + "UPDATE iceberg_tables " + + "SET table_namespace = :new_schema, table_name = :new_table " + + "WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table") + .bind("catalog", catalogId) + .bind("new_schema", to.getSchemaName()) + .bind("new_table", to.getTableName()) + .bind("schema", from.getSchemaName()) + .bind("table", from.getTableName()) + .execute(); + } + } + + // Requires public access modifier for JDBI library + public static class NamespaceProperties + { + private final String catalogName; + private final String namespace; + private final String key; + private final String value; + + public NamespaceProperties(String catalogName, String namespace, String key, String value) + { + this.catalogName = catalogName; + this.namespace = namespace; + this.key = key; + this.value = value; + } + + public String getCatalogName() + { + return catalogName; + } + + public String getNamespace() + { + return namespace; + } + + public String getKey() + { + return key; + } + + public String getValue() + { + return value; + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConfig.java new file mode 100644 index 000000000000..1a8d09706940 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; + +import javax.validation.constraints.NotEmpty; + +import java.util.Optional; + +public class JdbcIcebergConfig +{ + private String connectionUrl; + private String connectionUser; + private String connectionPassword; + private String catalogId; + private Optional defaultWarehouseDir = Optional.empty(); + + public String getConnectionUrl() + { + return connectionUrl; + } + + @Config("iceberg.metastore.jdbc.connection-url") + @ConfigDescription("The URI to connect to the JDBC server") + public JdbcIcebergConfig setConnectionUrl(String connectionUrl) + { + this.connectionUrl = connectionUrl; + return this; + } + + public Optional getConnectionUser() + { + return Optional.ofNullable(connectionUser); + } + + @Config("iceberg.metastore.jdbc.connection-user") + @ConfigDescription("User name for JDBC client") + public JdbcIcebergConfig setConnectionUser(String connectionUser) + { + this.connectionUser = connectionUser; + return this; + } + + public Optional getConnectionPassword() + { + return Optional.ofNullable(connectionPassword); + } + + @Config("iceberg.metastore.jdbc.connection-password") + @ConfigSecuritySensitive + @ConfigDescription("Password for JDBC client") + public JdbcIcebergConfig setConnectionPassword(String connectionPassword) + { + this.connectionPassword = connectionPassword; + return this; + } + + @NotEmpty + public String getCatalogId() + { + return catalogId; + } + + @Config("iceberg.metastore.jdbc.catalogid") + @ConfigDescription("Iceberg JDBC metastore catalog id") + public JdbcIcebergConfig setCatalogId(String catalogId) + { + this.catalogId = catalogId; + return this; + } + + public Optional getDefaultWarehouseDir() + { + return defaultWarehouseDir; + } + + @Config("iceberg.metastore.jdbc.default-warehouse-dir") + @ConfigDescription("The default warehouse directory to use for JDBC") + public JdbcIcebergConfig setDefaultWarehouseDir(String defaultWarehouseDir) + { + this.defaultWarehouseDir = Optional.ofNullable(defaultWarehouseDir); + return this; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConnectionFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConnectionFactory.java new file mode 100644 index 000000000000..97f3371851ed --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergConnectionFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import org.jdbi.v3.core.ConnectionFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkState; + +public class JdbcIcebergConnectionFactory + implements ConnectionFactory +{ + private final String connectionUrl; + private final Optional connectionUser; + private final Optional connectionPassword; + + public JdbcIcebergConnectionFactory(String connectionUrl, Optional connectionUser, Optional connectionPassword) + { + this.connectionUrl = connectionUrl; + this.connectionUser = connectionUser; + this.connectionPassword = connectionPassword; + } + + @Override + public Connection openConnection() + throws SQLException + { + Properties properties = new Properties(); + connectionUser.ifPresent(user -> properties.setProperty("user", user)); + connectionPassword.ifPresent(password -> properties.setProperty("password", password)); + Connection connection = DriverManager.getConnection(connectionUrl, properties); + checkState(connection != null, "Driver returned null connection, make sure the connection URL '%s' is valid", connectionUrl); + return connection; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperations.java new file mode 100644 index 000000000000..affcb08e5d53 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperations.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class JdbcIcebergTableOperations + extends AbstractIcebergTableOperations +{ + private final JdbcIcebergClient jdbcClient; + protected int version = -1; + + public JdbcIcebergTableOperations( + FileIO fileIo, + JdbcIcebergClient jdbcClient, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + } + + @Override + protected String getRefreshedLocation(boolean invalidateCaches) + { + return jdbcClient.getMetadataLocation(database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + verify(version == -1, "commitNewTable called on a table which already exists"); + jdbcClient.createTable(database, tableName, writeNewMetadata(metadata, 0)); + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + jdbcClient.alterTable(database, tableName, writeNewMetadata(metadata, version + 1), currentMetadataLocation); + shouldRefresh = true; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperationsProvider.java new file mode 100644 index 000000000000..f2ac8a728b35 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/JdbcIcebergTableOperationsProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorSession; + +import javax.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class JdbcIcebergTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final FileIoProvider fileIoProvider; + private final JdbcIcebergClient jdbcClient; + + @Inject + public JdbcIcebergTableOperationsProvider(FileIoProvider fileIoProvider, JdbcIcebergClient jdbcClient) + { + this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new JdbcIcebergTableOperations( + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), + jdbcClient, + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java new file mode 100644 index 000000000000..88e86bcaa9c7 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -0,0 +1,278 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TypeManager; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; +import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class TrinoJdbcCatalog + extends AbstractTrinoCatalog +{ + private final JdbcIcebergClient jdbcClient; + private final Optional defaultWarehouseDir; + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoJdbcCatalog( + CatalogName catalogName, + TypeManager typeManager, + IcebergTableOperationsProvider tableOperationsProvider, + JdbcIcebergClient jdbcClient, + String trinoVersion, + boolean useUniqueTableLocation, + Optional defaultWarehouseDir) + { + super(catalogName, typeManager, tableOperationsProvider, trinoVersion, useUniqueTableLocation); + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + this.defaultWarehouseDir = requireNonNull(defaultWarehouseDir, "defaultWarehouseDir is null"); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + return jdbcClient.getNamespaces(); + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + return jdbcClient.getNamespaceProperties(namespace).entrySet().stream() + .filter(property -> property.getKey().equals(LOCATION_PROPERTY)) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + jdbcClient.createNamespace(namespace, properties); + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + jdbcClient.dropNamespace(namespace); + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + return jdbcClient.getTables(namespaces); + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, String location, Map properties) + { + if (!listNamespaces(session).contains(schemaTableName.getSchemaName())) { + throw new SchemaNotFoundException(schemaTableName.getSchemaName()); + } + return newCreateTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + validateTableCanBeDropped(loadTable(session, schemaTableName)); + jdbcClient.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + jdbcClient.renameTable(from, to); + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + schemaTableName, + ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + + return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + String tableName = createNewTableName(schemaTableName.getTableName()); + Optional databaseLocation = jdbcClient.getNamespaceLocation(schemaTableName.getSchemaName()); + + Path location; + if (databaseLocation.isEmpty()) { + if (defaultWarehouseDir.isEmpty()) { + throw new TrinoException( + HIVE_DATABASE_LOCATION_ERROR, + format("Schema '%s' location cannot be determined. " + + "Either set the 'location' property when creating the schema, or set the 'hive.metastore.glue.default-warehouse-dir' " + + "catalog property.", + schemaTableName.getSchemaName())); + } + location = new Path(new Path(defaultWarehouseDir.get(), schemaTableName.getSchemaName()), tableName); + } + else { + location = new Path(databaseLocation.get(), tableName); + } + + return location.toString(); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public Optional redirectTable(ConnectorSession session, SchemaTableName tableName) + { + return Optional.empty(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java new file mode 100644 index 000000000000..00df47809bff --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.type.TypeManager; + +import javax.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class TrinoJdbcCatalogFactory + implements TrinoCatalogFactory +{ + private final CatalogName catalogName; + private final TypeManager typeManager; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final JdbcIcebergClient jdbcClient; + private final String trinoVersion; + private final Optional defaultWarehouseDir; + private final boolean isUniqueTableLocation; + + @Inject + public TrinoJdbcCatalogFactory( + CatalogName catalogName, + TypeManager typeManager, + IcebergTableOperationsProvider tableOperationsProvider, + JdbcIcebergClient jdbcClient, + NodeVersion nodeVersion, + JdbcIcebergConfig jdbcConfig, + IcebergConfig icebergConfig) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); + this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); + this.defaultWarehouseDir = requireNonNull(jdbcConfig, "jdbcConfig is null").getDefaultWarehouseDir(); + this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation(); + } + + @Override + public TrinoCatalog create(ConnectorIdentity identity) + { + return new TrinoJdbcCatalog( + catalogName, + typeManager, + tableOperationsProvider, + jdbcClient, + trinoVersion, + isUniqueTableLocation, + defaultWarehouseDir); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 80be705ac192..b404a8cf15fa 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -116,11 +116,15 @@ public DistributedQueryRunner build() queryRunner.installPlugin(new IcebergPlugin()); Map icebergProperties = new HashMap<>(this.icebergProperties.buildOrThrow()); - if (!icebergProperties.containsKey("iceberg.catalog.type")) { - Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); + String catalogType = icebergProperties.get("iceberg.catalog.type"); + Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); + if (catalogType == null) { icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE"); icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString()); } + if ("jdbc".equalsIgnoreCase(catalogType) && !icebergProperties.containsKey("iceberg.metastore.jdbc.default-warehouse-dir")) { + icebergProperties.put("iceberg.metastore.jdbc.default-warehouse-dir", dataDir.toString()); + } queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); schemaInitializer.orElse(SchemaInitializer.builder().build()).accept(queryRunner); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index f2516b8c806c..ab53e9987825 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -98,6 +98,24 @@ public void testGlueMetastore() .shutdown(); } + @Test + public void testJdbcMetastore() + { + ConnectorFactory factory = getConnectorFactory(); + + factory.create( + "test", + Map.of( + "iceberg.catalog.type", "jdbc", + "iceberg.metastore.jdbc.connection-url", "jdbc:postgresql://localhost:5432/test", + "iceberg.metastore.jdbc.connection-user", "alice", + "iceberg.metastore.jdbc.connection-password", "password", + "iceberg.metastore.jdbc.catalogid", "test", + "iceberg.metastore.jdbc.default-warehouse-dir", "s3://bucket"), + new TestingConnectorContext()) + .shutdown(); + } + @Test public void testRecordingMetastore() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java new file mode 100644 index 000000000000..378a7f4025da --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -0,0 +1,206 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.iceberg.BaseIcebergConnectorTest; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static io.trino.tpch.TpchTable.LINE_ITEM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergJdbcConnectorTest + extends BaseIcebergConnectorTest +{ + public TestIcebergJdbcConnectorTest() + { + super(ORC); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", "ORC") + .put("iceberg.catalog.type", "jdbc") + .put("iceberg.metastore.jdbc.connection-url", server.getJdbcUrl()) + .put("iceberg.metastore.jdbc.connection-user", server.getUser()) + .put("iceberg.metastore.jdbc.connection-password", server.getPassword()) + .put("iceberg.metastore.jdbc.catalogid", "test") + .buildOrThrow()) + .setInitialTables(ImmutableList.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .add(LINE_ITEM) + .build()) + .build(); + } + + @Override + public void testShowCreateSchema() + { + assertThat(computeActual("SHOW CREATE SCHEMA tpch").getOnlyValue().toString()) + .isEqualTo("CREATE SCHEMA iceberg.tpch"); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessage("renameNamespace is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testShowCreateView() + { + assertThatThrownBy(super::testShowCreateView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCompatibleTypeChangeForView() + { + assertThatThrownBy(super::testCompatibleTypeChangeForView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testCompatibleTypeChangeForView2() + { + assertThatThrownBy(super::testCompatibleTypeChangeForView2) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testDropNonEmptySchemaWithView() + { + assertThatThrownBy(super::testDropNonEmptySchemaWithView) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Test(dataProvider = "testViewMetadataDataProvider") + @Override + public void testViewMetadata(String securityClauseInCreate, String securityClauseInShowCreate) + { + assertThatThrownBy(() -> super.testViewMetadata(securityClauseInCreate, securityClauseInShowCreate)) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testViewCaseSensitivity() + { + assertThatThrownBy(super::testViewCaseSensitivity) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testViewAndMaterializedViewTogether() + { + assertThatThrownBy(super::testViewAndMaterializedViewTogether) + .hasMessage("createView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + assertThatThrownBy(super::testReadMetadataWithRelationsConcurrentModifications) + .hasMessage("java.lang.RuntimeException: createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Test(dataProvider = "testColumnNameDataProvider") + @Override + public void testMaterializedViewColumnName(String columnName) + { + assertThatThrownBy(() -> super.testMaterializedViewColumnName(columnName)) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testDropNonEmptySchemaWithMaterializedView() + { + assertThatThrownBy(super::testDropNonEmptySchemaWithMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testRenameMaterializedView() + { + assertThatThrownBy(super::testRenameMaterializedView) + .hasMessage("createMaterializedView is not supported for Iceberg JDBC catalogs"); + } + + @Override + public void testUpdateRowConcurrently() + { + throw new SkipException("Enable after fixing incorrect result issue"); + } + + @Override + public void testInsertRowConcurrently() + { + throw new SkipException("Enable after fixing incorrect result issue"); + } + + @Override + public void testAddColumnConcurrently() + { + throw new SkipException("Enable after fixing incorrect result issue"); + } + + @Override + protected boolean supportsIcebergFileStatistics(String typeName) + { + return !(typeName.equalsIgnoreCase("varbinary")) && + !(typeName.equalsIgnoreCase("uuid")); + } + + @Override + protected boolean supportsRowGroupStatistics(String typeName) + { + return !typeName.equalsIgnoreCase("varbinary"); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "10") + .build(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcIcebergConfig.java new file mode 100644 index 000000000000..e32cbc509fbd --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcIcebergConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestIcebergJdbcIcebergConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(JdbcIcebergConfig.class) + .setConnectionUrl(null) + .setConnectionUser(null) + .setConnectionPassword(null) + .setCatalogId(null) + .setDefaultWarehouseDir(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.metastore.jdbc.connection-url", "jdbc:postgresql://localhost:5432/test") + .put("iceberg.metastore.jdbc.connection-user", "alice") + .put("iceberg.metastore.jdbc.connection-password", "password") + .put("iceberg.metastore.jdbc.catalogid", "test") + .put("iceberg.metastore.jdbc.default-warehouse-dir", "s3://bucket") + .buildOrThrow(); + + JdbcIcebergConfig expected = new JdbcIcebergConfig() + .setConnectionUrl("jdbc:postgresql://localhost:5432/test") + .setConnectionUser("alice") + .setConnectionPassword("password") + .setCatalogId("test") + .setDefaultWarehouseDir("s3://bucket"); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java new file mode 100644 index 000000000000..94af660f533a --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.jdbc; + +import org.intellij.lang.annotations.Language; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.Closeable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import static java.lang.String.format; +import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; + +public class TestingIcebergJdbcServer + implements Closeable +{ + private static final String USER = "test"; + private static final String PASSWORD = "test"; + private static final String DATABASE = "tpch"; + + private final PostgreSQLContainer dockerContainer; + + public TestingIcebergJdbcServer() + { + // TODO: Use Iceberg docker image once the community provides it + dockerContainer = new PostgreSQLContainer<>("postgres:12.10") + .withDatabaseName(DATABASE) + .withUsername(USER) + .withPassword(PASSWORD) + .withCopyFileToContainer(MountableFile.forClasspathResource("jdbc/create-table.sql"), "/docker-entrypoint-initdb.d/create-table.sql"); + dockerContainer.start(); + + execute("CREATE SCHEMA tpch"); + } + + public void execute(@Language("SQL") String sql) + { + execute(getJdbcUrl(), getProperties(), sql); + } + + private static void execute(String url, Properties properties, String sql) + { + try (Connection connection = DriverManager.getConnection(url, properties); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public String getUser() + { + return USER; + } + + public String getPassword() + { + return PASSWORD; + } + + public Properties getProperties() + { + Properties properties = new Properties(); + properties.setProperty("user", USER); + properties.setProperty("password", PASSWORD); + properties.setProperty("currentSchema", "tpch,public"); + return properties; + } + + public String getJdbcUrl() + { + return format("jdbc:postgresql://%s:%s/%s", dockerContainer.getContainerIpAddress(), dockerContainer.getMappedPort(POSTGRESQL_PORT), DATABASE); + } + + @Override + public void close() + { + dockerContainer.close(); + } +} diff --git a/plugin/trino-iceberg/src/test/resources/jdbc/create-table.sql b/plugin/trino-iceberg/src/test/resources/jdbc/create-table.sql new file mode 100644 index 000000000000..550d066d7ee0 --- /dev/null +++ b/plugin/trino-iceberg/src/test/resources/jdbc/create-table.sql @@ -0,0 +1,20 @@ +/** + Table definition in Iceberg repository: + https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java + */ +CREATE TABLE iceberg_namespace_properties ( + catalog_name VARCHAR(255) NOT NULL, + namespace VARCHAR(255) NOT NULL, + key VARCHAR(5500), + value VARCHAR(5500), + PRIMARY KEY (catalog_name, namespace, key ) +); + +CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(5500), + previous_metadata_location VARCHAR(5500), + PRIMARY KEY (catalog_name, table_namespace, table_name) +); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java new file mode 100644 index 000000000000..7688e10d61ae --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergJdbcCatalog.java @@ -0,0 +1,133 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; +import io.trino.tests.product.launcher.testcontainers.PortBinder; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D; +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TEMPTO_PROFILE_CONFIG; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvSinglenodeSparkIcebergJdbcCatalog + extends EnvironmentProvider +{ + private static final int SPARK_THRIFT_PORT = 10213; + // Use non-default PostgreSQL port to avoid conflicts with locally installed PostgreSQL if any. + public static final int POSTGRESQL_PORT = 25432; + + private final DockerFiles dockerFiles; + private final PortBinder portBinder; + private final String imagesVersion; + private final String hadoopImagesVersion; + + @Inject + public EnvSinglenodeSparkIcebergJdbcCatalog(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder) + { + super(ImmutableList.of(standard, hadoop)); + this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); + this.portBinder = requireNonNull(portBinder, "portBinder is null"); + this.imagesVersion = requireNonNull(config, "config is null").getImagesVersion(); + this.hadoopImagesVersion = requireNonNull(config, "config is null").getHadoopImagesVersion(); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + String dockerImageName = "ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion; + + builder.configureContainer(HADOOP, container -> { + container.setDockerImageName(dockerImageName); + container.withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg/apply-hive-config-for-iceberg.sh")), + CONTAINER_HADOOP_INIT_D + "/apply-hive-config-for-iceberg.sh"); + }); + + builder.configureContainer(TESTS, dockerContainer -> { + dockerContainer.withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/tempto/tempto-configuration-for-hive3.yaml")), + CONTAINER_TEMPTO_PROFILE_CONFIG); + }); + + builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties"))); + + builder.addContainer(createPostgreSql()); + + builder.addContainer(createSpark()) + .containerDependsOn("spark", HADOOP); + } + + @SuppressWarnings("resource") + public DockerContainer createPostgreSql() + { + DockerContainer container = new DockerContainer("postgres:14.2", "postgresql") + .withEnv("POSTGRES_PASSWORD", "test") + .withEnv("POSTGRES_USER", "test") + .withEnv("POSTGRES_DB", "test") + .withEnv("PGPORT", Integer.toString(POSTGRESQL_PORT)) + .withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql")), + "/docker-entrypoint-initdb.d/create-table.sql") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(POSTGRESQL_PORT)); + + portBinder.exposePort(container, POSTGRESQL_PORT); + + return container; + } + + @SuppressWarnings("resource") + private DockerContainer createSpark() + { + try { + DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3.0-iceberg:" + imagesVersion, "spark") + .withEnv("HADOOP_USER_NAME", "hive") + .withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf")), + "/spark/conf/spark-defaults.conf") + .withCommand( + "spark-submit", + "--master", "local[*]", + "--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + "--name", "Thrift JDBC/ODBC Server", + "--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT, + "spark-internal") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(SPARK_THRIFT_PORT)); + + portBinder.exposePort(container, SPARK_THRIFT_PORT); + + return container; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java index 358710df55fe..2663898bd46e 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java @@ -25,6 +25,7 @@ import io.trino.tests.product.launcher.env.environment.EnvSinglenodePostgresql; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkHive; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIceberg; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergJdbcCatalog; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSqlserver; import io.trino.tests.product.launcher.env.environment.EnvTwoKerberosHives; import io.trino.tests.product.launcher.env.environment.EnvTwoMixedHives; @@ -67,6 +68,9 @@ public List getTestRuns(EnvironmentConfig config) .withGroups("configured_features", "iceberg") .withExcludedGroups("storage_formats") .build(), + testOnEnvironment(EnvSinglenodeSparkIcebergJdbcCatalog.class) + .withGroups("configured_features", "iceberg_jdbc") + .build(), testOnEnvironment(EnvSinglenodeHiveIcebergRedirections.class) .withGroups("configured_features", "hive_iceberg_redirections") .build(), diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh new file mode 100755 index 000000000000..2dc1e54559ea --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/apply-hive-config-for-iceberg.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -exuo pipefail + +echo "Applying hive-site configuration overrides for Spark" +apply-site-xml-override /etc/hive/conf/hive-site.xml "/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml" diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql new file mode 100644 index 000000000000..550d066d7ee0 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/create-table.sql @@ -0,0 +1,20 @@ +/** + Table definition in Iceberg repository: + https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java + */ +CREATE TABLE iceberg_namespace_properties ( + catalog_name VARCHAR(255) NOT NULL, + namespace VARCHAR(255) NOT NULL, + key VARCHAR(5500), + value VARCHAR(5500), + PRIMARY KEY (catalog_name, namespace, key ) +); + +CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(5500), + previous_metadata_location VARCHAR(5500), + PRIMARY KEY (catalog_name, table_namespace, table_name) +); diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml new file mode 100644 index 000000000000..b1411b3ab8a9 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/hive-site-overrides.xml @@ -0,0 +1,9 @@ + + + + + hive.metastore.disallow.incompatible.col.type.changes + false + + + diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties new file mode 100644 index 000000000000..f4ce17e7042f --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties @@ -0,0 +1,8 @@ +connector.name=iceberg +iceberg.catalog.type=jdbc +iceberg.metastore.jdbc.connection-url=jdbc:postgresql://postgresql:25432/test +iceberg.metastore.jdbc.connection-user=test +iceberg.metastore.jdbc.connection-password=test +iceberg.metastore.jdbc.catalogid=iceberg_test +iceberg.metastore.jdbc.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse +hive.hdfs.socks-proxy=hadoop-master:1180 diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf new file mode 100644 index 000000000000..b454045ee011 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/spark-defaults.conf @@ -0,0 +1,13 @@ +; disabling caching allows us to run spark queries interchangeably with trino's +spark.sql.catalog.iceberg_test.cache-enabled=false +spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.hive.thriftServer.singleSession=false + +spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.iceberg_test.warehouse=hdfs://hadoop-master:9000/user/hive/warehouse +spark.sql.catalog.iceberg_test.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog +spark.sql.catalog.iceberg_test.uri=jdbc:postgresql://postgresql:25432/test +spark.sql.catalog.iceberg_test.jdbc.user=test +spark.sql.catalog.iceberg_test.jdbc.password=test + +spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 66d2b24f962d..bb1299135bb7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -67,6 +67,7 @@ public final class TestGroups public static final String KAFKA = "kafka"; public static final String TWO_HIVES = "two_hives"; public static final String ICEBERG = "iceberg"; + public static final String ICEBERG_JDBC = "iceberg_jdbc"; public static final String AVRO = "avro"; public static final String PHOENIX = "phoenix"; public static final String CLICKHOUSE = "clickhouse"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index b885e6958f5a..f684072b2a08 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airlift.concurrent.MoreFutures; +import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.ProductTest; import io.trino.tempto.hadoop.hdfs.HdfsClient; import io.trino.tempto.query.QueryExecutionException; @@ -24,6 +25,7 @@ import io.trino.tempto.query.QueryResult; import io.trino.tests.product.hive.Engine; import org.assertj.core.api.Assertions; +import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -55,6 +57,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.ICEBERG_JDBC; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AND_INSERT; @@ -84,13 +87,21 @@ public class TestIcebergSparkCompatibility private static final String TRINO_CATALOG = "iceberg"; private static final String TEST_SCHEMA_NAME = "default"; - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "unsupportedStorageFormats") + @BeforeTestWithContext + public void setUp() + { + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS " + TRINO_CATALOG + "." + TEST_SCHEMA_NAME); + } + + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "unsupportedStorageFormats") public void testTrinoWithUnsupportedFileFormat(StorageFormat storageFormat) { - String tableName = "test_trino_unsupported_file_format_" + storageFormat; + String tableName = "test_trino_unsupported_file_format_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(tableName); String sparkTableName = sparkTableName(tableName); + onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); + onSpark().executeQuery(format("CREATE TABLE %s (x bigint) USING ICEBERG TBLPROPERTIES ('write.format.default'='%s')", sparkTableName, storageFormat)); onSpark().executeQuery(format("INSERT INTO %s VALUES (42)", sparkTableName)); @@ -102,10 +113,10 @@ public void testTrinoWithUnsupportedFileFormat(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_primitive_types_" + storageFormat; + String baseTableName = "test_trino_reading_primitive_types_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -198,13 +209,15 @@ public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersi onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSparkReadingTrinoDataDataProvider") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSparkReadingTrinoDataDataProvider") public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode createMode) { - String baseTableName = "test_spark_reading_primitive_types_" + storageFormat + "_" + createMode; + String baseTableName = "test_spark_reading_primitive_types_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + createMode.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + String namedValues = "SELECT " + " VARCHAR 'a_string' _string " + ", 1000000000000000 _bigint " + @@ -324,13 +337,15 @@ public Object[][] testSparkReadingTrinoDataDataProvider() .toArray(Object[][]::new); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadTrinoUuid(StorageFormat storageFormat) { - String tableName = "test_spark_read_trino_uuid_" + storageFormat; + String tableName = "test_spark_read_trino_uuid_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(tableName); String sparkTableName = sparkTableName(tableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + onTrino().executeQuery(format( "CREATE TABLE %s AS SELECT UUID '406caec7-68b9-4778-81b2-a12ece70c8b1' u", trinoTableName)); @@ -339,12 +354,12 @@ public void testSparkReadTrinoUuid(StorageFormat storageFormat) // TODO Iceberg Spark integration needs yet to gain support // once this is supported, merge this test with testSparkReadingTrinoData() .isInstanceOf(SQLException.class) - .hasMessageMatching("org.apache.hive.service.cli.HiveSQLException: Error running query:.*\\Q java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.unsafe.types.UTF8String\\E(?s:.*)"); + .hasMessageMatching("org.apache.hive.service.cli.HiveSQLException: Error running query:.*\\Q java.lang.ClassCastException\\E(?s:.*)"); onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testSparkCreatesTrinoDrops(int specVersion) { String baseTableName = "test_spark_creates_trino_drops"; @@ -352,7 +367,7 @@ public void testSparkCreatesTrinoDrops(int specVersion) onTrino().executeQuery("DROP TABLE " + trinoTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testTrinoCreatesSparkDrops() { String baseTableName = "test_trino_creates_spark_drops"; @@ -360,10 +375,10 @@ public void testTrinoCreatesSparkDrops() onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) { - String baseTableName = "test_spark_reads_trino_partitioned_table_" + storageFormat; + String baseTableName = "test_spark_reads_trino_partitioned_table_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -388,10 +403,10 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reads_spark_partitioned_table_" + storageFormat; + String baseTableName = "test_trino_reads_spark_partitioned_table_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); @@ -420,10 +435,10 @@ public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_spark_composites_" + storageFormat; + String baseTableName = "test_trino_reading_spark_composites_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -448,10 +463,10 @@ public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) { - String baseTableName = "test_spark_reading_trino_composites_" + storageFormat; + String baseTableName = "test_spark_reading_trino_composites_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -475,10 +490,10 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_spark_iceberg_table_properties_" + storageFormat; + String baseTableName = "test_trino_reading_spark_iceberg_table_properties_" + storageFormat.name().toLowerCase(ENGLISH); String propertiesTableName = "\"" + baseTableName + "$properties\""; String sparkTableName = sparkTableName(baseTableName); String trinoPropertiesTableName = trinoTableName(propertiesTableName); @@ -503,10 +518,10 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_nested_spark_data_" + storageFormat; + String baseTableName = "test_trino_reading_nested_spark_data_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -560,10 +575,10 @@ public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int spe onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) { - String baseTableName = "test_spark_reading_nested_trino_data_" + storageFormat; + String baseTableName = "test_spark_reading_nested_trino_data_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -617,10 +632,10 @@ public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_schema_evolution_for_nested_fields_" + storageFormat; + String baseTableName = "test_schema_evolution_for_nested_fields_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -735,10 +750,10 @@ public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_read_after_partition_evolution_" + storageFormat; + String baseTableName = "test_read_after_partition_evolution_" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -813,23 +828,26 @@ public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int spe onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testTrinoShowingSparkCreatedTables(int specVersion) { String sparkTable = "test_table_listing_for_spark"; String trinoTable = "test_table_listing_for_trino"; + onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTable); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTable); + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG TBLPROPERTIES('format-version' = %s)", sparkTableName(sparkTable), specVersion)); onTrino().executeQuery(format("CREATE TABLE %s (_integer INTEGER )", trinoTableName(trinoTable))); - assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s LIKE '%s'", TEST_SCHEMA_NAME, "test_table_listing_for_%"))) + assertThat(onTrino().executeQuery(format("SHOW TABLES FROM %s.%s LIKE '%s'", TRINO_CATALOG, TEST_SCHEMA_NAME, "test_table_listing_for_%"))) .containsOnly(row(sparkTable), row(trinoTable)); onSpark().executeQuery("DROP TABLE " + sparkTableName(sparkTable)); onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) { String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; @@ -848,7 +866,7 @@ public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) onSpark().executeQuery(format("DROP TABLE %s", sparkTableName(tableSameLocation2))); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) { String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; @@ -868,10 +886,10 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) // Can't clean up tableSameLocation2 as all data and metadata has been removed } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_object_storage_location_provider_" + storageFormat; + String baseTableName = "test_object_storage_location_provider_" + storageFormat.name().toLowerCase(ENGLISH); String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_object_storage_location_provider/obj-data"; @@ -898,10 +916,10 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_writer_data_path_" + storageFormat; + String baseTableName = "test_writer_data_path_" + storageFormat.name().toLowerCase(ENGLISH); String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data"; @@ -965,7 +983,7 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), ((value, index) -> row((int) index, value))) .collect(toImmutableList()); - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testStringPartitioningWithSpecialCharactersCtasInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -983,7 +1001,7 @@ public void testStringPartitioningWithSpecialCharactersCtasInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testStringPartitioningWithSpecialCharactersInsertInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -999,7 +1017,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testStringPartitioningWithSpecialCharactersInsertInSpark() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_spark"; @@ -1015,7 +1033,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInSpark() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testInsertReadingFromParquetTableWithNestedRowFieldNotPresentInDataFile() { // regression test for https://github.com/trinodb/trino/issues/9264 @@ -1099,7 +1117,7 @@ private void assertSelectsOnSpecialCharacters(String trinoTableName, String spar /** * @see TestIcebergInsert#testIcebergConcurrentInsert() */ - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, timeOut = 60_000) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, timeOut = 60_000) public void testTrinoSparkConcurrentInsert() throws Exception { @@ -1166,12 +1184,12 @@ public void testTrinoSparkConcurrentInsert() } } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, String compressionCodec) { String baseTableName = "test_spark_compression" + - "_" + storageFormat + - "_" + compressionCodec + + "_" + storageFormat.name().toLowerCase(ENGLISH) + + "_" + compressionCodec.toLowerCase(ENGLISH) + "_" + randomTableSuffix(); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1214,12 +1232,12 @@ public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, Str onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, String compressionCodec) { String baseTableName = "test_trino_compression" + - "_" + storageFormat + - "_" + compressionCodec + + "_" + storageFormat.name().toLowerCase(ENGLISH) + + "_" + compressionCodec.toLowerCase(ENGLISH) + "_" + randomTableSuffix(); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1246,7 +1264,7 @@ public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, Str onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void verifyCompressionCodecsDataProvider() { assertThat(onTrino().executeQuery("SHOW SESSION LIKE 'iceberg.compression_codec'")) @@ -1279,7 +1297,7 @@ private List compressionCodecs() "GZIP"); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) { String baseTableName = "test_trino_reading_migrated_nested_data_" + randomTableSuffix(); @@ -1292,7 +1310,7 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) ", nested_array ARRAY>>>\n" + ", nested_struct STRUCT>)\n" + " USING %s"; - onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name().toLowerCase(ENGLISH))); String insert = "" + "INSERT INTO TABLE %s SELECT" + @@ -1302,7 +1320,14 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) " ,map('m2', array(named_struct('mName', 'MAS2Name1', 'mNumber', 401), named_struct('mName', 'MAS2Name2', 'mNumber', 402))))" + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('street_number', 42, 'street_name', 'Wallaby Way'))"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); Row row = row("Doc213", "ASName2", 201, "MAS2Name1", 302, "P. Sherman", 42, "Wallaby Way"); @@ -1344,7 +1369,7 @@ public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) assertThat(onTrino().executeQuery("SELECT nested_struct.address.street_number, nested_struct.address.street_name FROM " + trinoTableName)).containsOnly(row(null, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) { String baseTableName = "test_migrated_data_with_altered_schema_" + randomTableSuffix(); @@ -1355,14 +1380,21 @@ public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) " doc_id STRING\n" + ", nested_struct STRUCT>)\n" + " USING %s"; - onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat)); String insert = "" + "INSERT INTO TABLE %s SELECT" + " 'Doc213'" + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('a', 42, 'b', 'Wallaby Way'))"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); onSpark().executeQuery("ALTER TABLE " + sparkTableName + " RENAME COLUMN nested_struct TO nested_struct_moved"); @@ -1385,18 +1417,25 @@ public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) assertThat(onTrino().executeQuery(select + trinoTableName)).containsOnly(row(null, null, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) { String baseTableName = "test_migrated_data_with_partial_name_mapping_" + randomTableSuffix(); String defaultCatalogTableName = sparkDefaultCatalogTableName(baseTableName); - String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name(); + String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name().toLowerCase(ENGLISH); onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName)); String insert = "INSERT INTO TABLE %s SELECT 1, 2"; onSpark().executeQuery(format(insert, defaultCatalogTableName)); - onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + try { + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + } + catch (QueryExecutionException e) { + if (e.getMessage().contains("Cannot use catalog spark_catalog: not a ProcedureCatalog")) { + throw new SkipException("This catalog doesn't support calling system.migrate procedure"); + } + } String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); @@ -1408,7 +1447,7 @@ public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) .containsOnly(row(1, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testPartialStats() { String tableName = "test_partial_stats_" + randomTableSuffix(); @@ -1428,7 +1467,7 @@ public void testPartialStats() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testStatsAfterAddingPartitionField() { String tableName = "test_stats_after_adding_partition_field_" + randomTableSuffix(); @@ -1454,10 +1493,10 @@ public void testStatsAfterAddingPartitionField() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String tableName = format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomTableSuffix()); + String tableName = format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name().toLowerCase(ENGLISH), deleteFileStorageFormat.name().toLowerCase(ENGLISH), randomTableSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1487,10 +1526,10 @@ public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String tableName = format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomTableSuffix()); + String tableName = format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name().toLowerCase(ENGLISH), deleteFileStorageFormat.name().toLowerCase(ENGLISH), randomTableSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1512,10 +1551,10 @@ public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableSt onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) { - String tableName = format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomTableSuffix()); + String tableName = format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name().toLowerCase(ENGLISH), randomTableSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1539,10 +1578,10 @@ public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat) { - String tableName = format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomTableSuffix()); + String tableName = format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name().toLowerCase(ENGLISH), randomTableSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1558,10 +1597,10 @@ public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storage onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) { - String baseTableName = "test_delete_after_partition_evolution_" + storageFormat + randomTableSuffix(); + String baseTableName = "test_delete_after_partition_evolution_" + storageFormat.name().toLowerCase(ENGLISH) + randomTableSuffix(); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1611,7 +1650,7 @@ public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testMissingMetrics() { String tableName = "test_missing_metrics_" + randomTableSuffix(); @@ -1625,7 +1664,7 @@ public void testMissingMetrics() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testOptimizeOnV2IcebergTable() { String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomTableSuffix()); @@ -1738,13 +1777,13 @@ public enum CreateMode { CREATE_TABLE_AND_INSERT, CREATE_TABLE_AS_SELECT, - CREATE_TABLE_WITH_NO_DATA_AND_INSERT, + CREATE_TABLE_WITH_NO_DATA_AND_INSERT } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) { - String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat; + String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1784,7 +1823,7 @@ public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat; + String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1825,10 +1864,10 @@ public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat sto onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat; + String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1857,10 +1896,10 @@ public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(Stor onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String baseTableName = "test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat; + String baseTableName = "test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat.name().toLowerCase(ENGLISH); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1894,7 +1933,7 @@ public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableSto .containsOnly(row); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testUpdateAfterSchemaEvolution() { String baseTableName = "test_update_after_schema_evolution_" + randomTableSuffix(); @@ -1936,7 +1975,7 @@ public void testUpdateAfterSchemaEvolution() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testUpdateOnPartitionColumn() { String baseTableName = "test_update_on_partition_column" + randomTableSuffix(); @@ -1969,7 +2008,7 @@ public void testUpdateOnPartitionColumn() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}) public void testHandlingPartitionSchemaEvolutionInPartitionMetadata() { String baseTableName = "test_handling_partition_schema_evolution_" + randomTableSuffix();