Skip to content

Commit

Permalink
Support registering and unregistering tables in Iceberg JDBC catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 14, 2023
1 parent d630d35 commit 6d00587
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class TrinoJdbcCatalog
extends AbstractTrinoCatalog
{
private final JdbcCatalog jdbcCatalog;
private final IcebergJdbcClient jdbcClient;
private final TrinoFileSystemFactory fileSystemFactory;
private final String defaultWarehouseDir;
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
Expand All @@ -72,12 +74,14 @@ public TrinoJdbcCatalog(
TypeManager typeManager,
IcebergTableOperationsProvider tableOperationsProvider,
JdbcCatalog jdbcCatalog,
IcebergJdbcClient jdbcClient,
TrinoFileSystemFactory fileSystemFactory,
boolean useUniqueTableLocation,
String defaultWarehouseDir)
{
super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation);
this.jdbcCatalog = requireNonNull(jdbcCatalog, "jdbcCatalog is null");
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.defaultWarehouseDir = requireNonNull(defaultWarehouseDir, "defaultWarehouseDir is null");
}
Expand Down Expand Up @@ -186,13 +190,17 @@ public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTab
@Override
public void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation)
{
throw new TrinoException(NOT_SUPPORTED, "registerTable is not supported for Iceberg JDBC catalogs");
// Using IcebergJdbcClient because JdbcCatalog.registerTable causes the below error.
// "Cannot invoke "org.apache.iceberg.util.SerializableSupplier.get()" because "this.hadoopConf" is null"
jdbcClient.createTable(tableName.getSchemaName(), tableName.getTableName(), metadataLocation);
}

@Override
public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
{
throw new TrinoException(NOT_SUPPORTED, "unregisterTable is not supported for Iceberg JDBC catalogs");
if (!jdbcCatalog.dropTable(toIdentifier(tableName), false)) {
throw new TableNotFoundException(tableName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TrinoJdbcCatalogFactory
private final TypeManager typeManager;
private final IcebergTableOperationsProvider tableOperationsProvider;
private final TrinoFileSystemFactory fileSystemFactory;
private final IcebergJdbcClient jdbcClient;
private final String jdbcCatalogName;
private final String connectionUrl;
private final Optional<String> connectionUser;
Expand All @@ -57,6 +58,7 @@ public TrinoJdbcCatalogFactory(
TypeManager typeManager,
IcebergTableOperationsProvider tableOperationsProvider,
TrinoFileSystemFactory fileSystemFactory,
IcebergJdbcClient jdbcClient,
IcebergJdbcCatalogConfig jdbcConfig,
IcebergConfig icebergConfig)
{
Expand All @@ -65,6 +67,7 @@ public TrinoJdbcCatalogFactory(
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation();
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.jdbcCatalogName = jdbcConfig.getCatalogName();
this.connectionUrl = jdbcConfig.getConnectionUrl();
this.connectionUser = jdbcConfig.getConnectionUser();
Expand All @@ -84,6 +87,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
typeManager,
tableOperationsProvider,
icebergCatalog,
jdbcClient,
fileSystemFactory,
isUniqueTableLocation,
defaultWarehouseDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
package io.trino.plugin.iceberg.catalog.jdbc;

import com.google.common.collect.ImmutableMap;
import io.trino.hadoop.ConfigurationInstantiator;
import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.testng.annotations.AfterClass;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;

Expand All @@ -29,11 +36,17 @@
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
import static java.lang.String.format;
import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
import static org.apache.iceberg.CatalogProperties.URI;
import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION;
import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
import static org.apache.iceberg.jdbc.JdbcCatalog.PROPERTY_PREFIX;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergJdbcCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private JdbcCatalog jdbcCatalog;
private File warehouseLocation;

public TestIcebergJdbcCatalogConnectorSmokeTest()
Expand All @@ -60,6 +73,14 @@ protected QueryRunner createQueryRunner()
warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_catalog_smoke_test").toFile();
closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));
TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer());
jdbcCatalog = (JdbcCatalog) buildIcebergCatalog("tpch", ImmutableMap.<String, String>builder()
.put(CATALOG_IMPL, JdbcCatalog.class.getName())
.put(URI, server.getJdbcUrl())
.put(PROPERTY_PREFIX + "user", USER)
.put(PROPERTY_PREFIX + "password", PASSWORD)
.put(WAREHOUSE_LOCATION, warehouseLocation.getAbsolutePath())
.buildOrThrow(),
ConfigurationInstantiator.newEmptyConfiguration());
return IcebergQueryRunner.builder()
.setIcebergProperties(
ImmutableMap.<String, String>builder()
Expand All @@ -76,6 +97,13 @@ protected QueryRunner createQueryRunner()
.build();
}

@AfterClass(alwaysRun = true)
public final void destroy()
{
jdbcCatalog.close();
jdbcCatalog = null;
}

@Override
public void testView()
{
Expand All @@ -100,14 +128,14 @@ public void testRenameSchema()
@Override
protected void dropTableFromMetastore(String tableName)
{
// used when registering a table, which is not supported by the JDBC catalog
jdbcCatalog.dropTable(toIdentifier(tableName), false);
}

@Override
protected String getMetadataLocation(String tableName)
{
// used when registering a table, which is not supported by the JDBC catalog
throw new UnsupportedOperationException("metadata location for register_table is not supported");
BaseTable table = (BaseTable) jdbcCatalog.loadTable(toIdentifier(tableName));
return table.operations().current().metadataFileLocation();
}

@Override
Expand All @@ -122,79 +150,19 @@ protected boolean locationExists(String location)
return Files.exists(Path.of(location));
}

@Override
public void testRegisterTableWithTableLocation()
{
assertThatThrownBy(super::testRegisterTableWithTableLocation)
.hasMessageContaining("registerTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRegisterTableWithComments()
{
assertThatThrownBy(super::testRegisterTableWithComments)
.hasMessageContaining("registerTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRegisterTableWithShowCreateTable()
{
assertThatThrownBy(super::testRegisterTableWithShowCreateTable)
.hasMessageContaining("registerTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRegisterTableWithReInsert()
{
assertThatThrownBy(super::testRegisterTableWithReInsert)
.hasMessageContaining("registerTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRegisterTableWithDifferentTableName()
{
assertThatThrownBy(super::testRegisterTableWithDifferentTableName)
.hasMessageContaining("registerTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRegisterTableWithMetadataFile()
{
assertThatThrownBy(super::testRegisterTableWithMetadataFile)
.hasMessageContaining("metadata location for register_table is not supported");
}

@Override
public void testUnregisterTable()
{
assertThatThrownBy(super::testUnregisterTable)
.hasMessageContaining("unregisterTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testUnregisterBrokenTable()
{
assertThatThrownBy(super::testUnregisterBrokenTable)
.hasMessageContaining("unregisterTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testUnregisterTableNotExistingTable()
{
assertThatThrownBy(super::testUnregisterTableNotExistingTable)
.hasMessageContaining("unregisterTable is not supported for Iceberg JDBC catalogs");
}

@Override
public void testRepeatUnregisterTable()
private TableIdentifier toIdentifier(String tableName)
{
assertThatThrownBy(super::testRepeatUnregisterTable)
.hasMessageContaining("unregisterTable is not supported for Iceberg JDBC catalogs");
return TableIdentifier.of(getSession().getSchema().orElseThrow(), tableName);
}

@Override
protected void deleteDirectory(String location)
{
// used when unregistering a table, which is not supported by the JDBC catalog
try {
deleteRecursively(Path.of(location), ALLOW_INSECURE);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

0 comments on commit 6d00587

Please sign in to comment.