Skip to content

Commit

Permalink
Support JDBC catalog in Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jan 19, 2023
1 parent 7facf0e commit d868e91
Show file tree
Hide file tree
Showing 29 changed files with 1,739 additions and 65 deletions.
27 changes: 26 additions & 1 deletion docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Configuration

The connector supports multiple Iceberg catalog types, you may use either a Hive
metastore service (HMS), AWS Glue, or a REST catalog. The catalog type is determined by the
``iceberg.catalog.type`` property, it can be set to ``HIVE_METASTORE``, ``GLUE``, or ``REST``.
``iceberg.catalog.type`` property, it can be set to ``HIVE_METASTORE``, ``GLUE``, ``JDBC``, or ``REST``.


.. _iceberg-hive-catalog:
Expand Down Expand Up @@ -80,6 +80,8 @@ configuration properties as the Hive connector's Glue setup. See
connector.name=iceberg
iceberg.catalog.type=glue
.. _iceberg-rest-catalog:

REST catalog
^^^^^^^^^^^^^^

Expand Down Expand Up @@ -118,6 +120,29 @@ Property Name Description
iceberg.rest-catalog.uri=http://iceberg-with-rest:8181
.. _iceberg-jdbc-catalog:

JDBC catalog
^^^^^^^^^^^^

.. warning::

The JDBC catalog may face the compatibility issue if Iceberg introduces breaking changes in the future.
Consider the :ref:`REST catalog <iceberg-rest-catalog>` as an alternative solution.

At a minimum, ``iceberg.jdbc-catalog.connection-url`` and
``iceberg.jdbc-catalog.catalog-name`` must be configured.
When using any database besides PostgreSQL, a JDBC driver jar file must be placed in the plugin directory.

.. code-block:: text
connector.name=iceberg
iceberg.catalog.type=jdbc
iceberg.jdbc-catalog.catalog-name=test
iceberg.jdbc-catalog.connection-url=jdbc:postgresql://example.net:5432/database?user=admin&password=test
iceberg.jdbc-catalog.default-warehouse-dir=s3://bucket
General configuration
^^^^^^^^^^^^^^^^^^^^^

Expand Down
23 changes: 23 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@
<artifactId>iceberg-parquet</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down Expand Up @@ -297,6 +302,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>

<!-- Trino SPI -->
<dependency>
<groupId>io.trino</groupId>
Expand Down Expand Up @@ -452,6 +463,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public enum CatalogType
HIVE_METASTORE,
GLUE,
REST,
JDBC,
/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.jdbc.IcebergJdbcCatalogModule;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogModule;

import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.trino.plugin.iceberg.CatalogType.GLUE;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.CatalogType.JDBC;
import static io.trino.plugin.iceberg.CatalogType.REST;
import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE;

Expand All @@ -39,6 +41,7 @@ protected void setup(Binder binder)
bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule());
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
bindCatalogModule(REST, new IcebergRestCatalogModule());
bindCatalogModule(JDBC, new IcebergJdbcCatalogModule());
}

private void bindCatalogModule(CatalogType catalogType, Module module)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.jdbc;

import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergJdbcCatalogModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(IcebergJdbcConfig.class);
binder.bind(IcebergTableOperationsProvider.class).to(IcebergJdbcTableOperationsProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName();
binder.bind(TrinoCatalogFactory.class).to(TrinoJdbcCatalogFactory.class).in(Scopes.SINGLETON);
binder.bind(TrinoJdbcCatalogFactory.class);
newExporter(binder).export(TrinoJdbcCatalogFactory.class).withGeneratedName();
}

@Provides
@Singleton
public static IcebergJdbcClient createIcebergJdbcClient(IcebergJdbcConfig config)
{
return new IcebergJdbcClient(
new IcebergJdbcConnectionFactory(config.getConnectionUrl()),
config.getCatalogName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.jdbc;

import org.apache.iceberg.exceptions.CommitFailedException;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class IcebergJdbcClient
{
private final IcebergJdbcConnectionFactory connectionFactory;
private final String catalogName;

public IcebergJdbcClient(IcebergJdbcConnectionFactory connectionFactory, String catalogName)
{
this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
}

public void createTable(String schemaName, String tableName, String metadataLocation)
{
try (Handle handle = Jdbi.open(connectionFactory)) {
handle.createUpdate("" +
"INSERT INTO iceberg_tables " +
"(catalog_name, table_namespace, table_name, metadata_location, previous_metadata_location) " +
"VALUES (:catalog, :schema, :table, :metadata_location, null)")
.bind("catalog", catalogName)
.bind("schema", schemaName)
.bind("table", tableName)
.bind("metadata_location", metadataLocation)
.execute();
}
}

public void alterTable(String schemaName, String tableName, String newMetadataLocation, String previousMetadataLocation)
{
try (Handle handle = Jdbi.open(connectionFactory)) {
int updatedRecords = handle.createUpdate("" +
"UPDATE iceberg_tables " +
"SET metadata_location = :metadata_location, previous_metadata_location = :previous_metadata_location " +
"WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table AND metadata_location = :previous_metadata_location")
.bind("metadata_location", newMetadataLocation)
.bind("previous_metadata_location", previousMetadataLocation)
.bind("catalog", catalogName)
.bind("schema", schemaName)
.bind("table", tableName)
.execute();
if (updatedRecords != 1) {
throw new CommitFailedException("Failed to update table due to concurrent updates");
}
}
}

public Optional<String> getMetadataLocation(String schemaName, String tableName)
{
try (Handle handle = Jdbi.open(connectionFactory)) {
return handle.createQuery("" +
"SELECT metadata_location " +
"FROM iceberg_tables " +
"WHERE catalog_name = :catalog AND table_namespace = :schema AND table_name = :table")
.bind("catalog", catalogName)
.bind("schema", schemaName)
.bind("table", tableName)
.mapTo(String.class)
.findOne();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.jdbc;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;

import javax.validation.constraints.NotEmpty;

public class IcebergJdbcConfig
{
private String connectionUrl;
private String catalogName;
private String defaultWarehouseDir;

public String getConnectionUrl()
{
return connectionUrl;
}

@Config("iceberg.jdbc-catalog.connection-url")
@ConfigDescription("The URI to connect to the JDBC server")
@ConfigSecuritySensitive
public IcebergJdbcConfig setConnectionUrl(String connectionUrl)
{
this.connectionUrl = connectionUrl;
return this;
}

@NotEmpty
public String getCatalogName()
{
return catalogName;
}

@Config("iceberg.jdbc-catalog.catalog-name")
@ConfigDescription("Iceberg JDBC metastore catalog name")
public IcebergJdbcConfig setCatalogName(String catalogName)
{
this.catalogName = catalogName;
return this;
}

@NotEmpty
public String getDefaultWarehouseDir()
{
return defaultWarehouseDir;
}

@Config("iceberg.jdbc-catalog.default-warehouse-dir")
@ConfigDescription("The default warehouse directory to use for JDBC")
public IcebergJdbcConfig setDefaultWarehouseDir(String defaultWarehouseDir)
{
this.defaultWarehouseDir = defaultWarehouseDir;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.jdbc;

import org.jdbi.v3.core.ConnectionFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class IcebergJdbcConnectionFactory
implements ConnectionFactory
{
private final String connectionUrl;

public IcebergJdbcConnectionFactory(String connectionUrl)
{
this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null");
}

@Override
public Connection openConnection()
throws SQLException
{
Connection connection = DriverManager.getConnection(connectionUrl);
checkState(connection != null, "Driver returned null connection, make sure the connection URL is valid");
return connection;
}
}
Loading

0 comments on commit d868e91

Please sign in to comment.