Skip to content

Commit

Permalink
Support Glue catalog in Iceberg connector
Browse files Browse the repository at this point in the history
Co-authored-by: Alex <jo.alex2144@gmail.com>
  • Loading branch information
jackye1995 and alexjo2144 committed Mar 14, 2022
1 parent fb7124f commit 30ab6ac
Show file tree
Hide file tree
Showing 31 changed files with 1,744 additions and 184 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ jobs:
if: matrix.modules == 'plugin/trino-bigquery' && env.BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY != ''
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-case-insensitive-mapping -Dbigquery.credentials-key="${BIGQUERY_CASE_INSENSITIVE_CREDENTIALS_KEY}"
- name: Iceberg Glue Catalog Tests
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESSKEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRETKEY }}
AWS_REGION: us-east-2
S3_BUCKET: presto-ci-test
if: contains(matrix.modules, 'plugin/trino-iceberg') && (env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-iceberg -P test-glue-catalog -Ds3.bucket=${S3_BUCKET}
- name: Sanitize artifact name
if: always()
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -126,12 +127,22 @@ private static CoralTableRedirectionResolver coralTableRedirectionResolver(

public static boolean isPrestoView(Table table)
{
return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG));
return isPrestoView(table.getParameters());
}

public static boolean isPrestoView(Map<String, String> tableParameters)
{
return "true".equals(tableParameters.get(PRESTO_VIEW_FLAG));
}

public static boolean isHiveOrPrestoView(Table table)
{
return table.getTableType().equals(TableType.VIRTUAL_VIEW.name());
return isHiveOrPrestoView(table.getTableType());
}

public static boolean isHiveOrPrestoView(String tableType)
{
return tableType.equals(TableType.VIRTUAL_VIEW.name());
}

public static boolean canDecodeView(Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public GlueHiveMetastore(
this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats);
}

private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional<RequestHandler2> requestHandler, RequestMetricCollector metricsCollector)
{
ClientConfiguration clientConfig = new ClientConfiguration()
.withMaxConnections(config.getMaxGlueConnections())
Expand Down
37 changes: 37 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down Expand Up @@ -320,6 +335,9 @@
<configuration>
<excludes>
<exclude>**/TestIceberg*FailureRecoveryTest.java</exclude>
<exclude>**/TestIcebergGlueCatalogConnectorSmokeTest.java</exclude>
<exclude>**/TestTrinoGlueCatalogTest.java</exclude>
<exclude>**/TestSharedGlueMetastore.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -371,5 +389,24 @@
</plugins>
</build>
</profile>

<profile>
<id>test-glue-catalog</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestIcebergGlueCatalogConnectorSmokeTest.java</include>
<include>**/TestTrinoGlueCatalogTest.java</include>
<include>**/TestSharedGlueMetastore.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public enum IcebergErrorCode
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_COMMIT_ERROR(12, EXTERNAL),
ICEBERG_CATALOG_ERROR(13, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -66,7 +65,6 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON);
binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
Expand Down Expand Up @@ -125,10 +124,10 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table)
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}

public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
{
TableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
catalog,
session,
table.getSchemaName(),
table.getTableName(),
Expand All @@ -138,14 +137,14 @@ public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperat
}

public static Table getIcebergTableWithMetadata(
HiveMetastore metastore,
TrinoCatalog catalog,
IcebergTableOperationsProvider tableOperationsProvider,
ConnectorSession session,
SchemaTableName table,
TableMetadata tableMetadata)
{
IcebergTableOperations operations = tableOperationsProvider.createTableOperations(
metastore,
catalog,
session,
table.getSchemaName(),
table.getTableName(),
Expand Down Expand Up @@ -237,7 +236,7 @@ public static Optional<String> getTableComment(Table table)
return Optional.ofNullable(table.properties().get(TABLE_COMMENT));
}

private static String quotedTableName(SchemaTableName name)
public static String quotedTableName(SchemaTableName name)
{
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog;

import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.Transactions.createTableTransaction;

public abstract class AbstractTrinoCatalog
implements TrinoCatalog
{
protected final IcebergTableOperationsProvider tableOperationsProvider;
private final boolean useUniqueTableLocation;

protected AbstractTrinoCatalog(
IcebergTableOperationsProvider tableOperationsProvider,
boolean useUniqueTableLocation)
{
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.useUniqueTableLocation = useUniqueTableLocation;
}

@Override
public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional<String> comment)
{
Table icebergTable = loadTable(session, schemaTableName);
if (comment.isEmpty()) {
icebergTable.updateProperties().remove(TABLE_COMMENT).commit();
}
else {
icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit();
}
}

@Override
public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment)
{
Table icebergTable = loadTable(session, schemaTableName);
icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit();
}

protected Transaction newCreateTableTransaction(
ConnectorSession session,
SchemaTableName schemaTableName,
Schema schema,
PartitionSpec partitionSpec,
String location,
Map<String, String> properties,
Optional<String> owner)
{
TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties);
TableOperations ops = tableOperationsProvider.createTableOperations(
this,
session,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
owner,
Optional.of(location));
return createTableTransaction(schemaTableName.toString(), ops, metadata);
}

protected String createNewTableName(String baseTableName)
{
String tableName = baseTableName;
if (useUniqueTableLocation) {
tableName += "-" + randomUUID().toString().replace("-", "");
}
return tableName;
}

protected void deleteTableDirectory(
ConnectorSession session,
SchemaTableName schemaTableName,
HdfsEnvironment hdfsEnvironment,
Path tableLocation)
{
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), tableLocation);
fileSystem.delete(tableLocation, true);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("Failed to delete directory %s of the table %s", tableLocation, schemaTableName), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg.catalog;

import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
Expand All @@ -26,13 +27,14 @@
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;

import javax.inject.Inject;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory;

import java.util.Optional;

import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.trino.plugin.iceberg.CatalogType.GLUE;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE;
import static java.util.Objects.requireNonNull;
Expand All @@ -52,16 +54,16 @@ protected void setup(Binder binder)
{
if (metastore.isPresent()) {
binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore.get()));
binder.bind(MetastoreValidator.class).asEagerSingleton();
install(new DecoratedHiveMetastoreModule());
binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
}
else {
bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule());
bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule());
// TODO add support for Glue metastore
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
}

binder.bind(MetastoreValidator.class).asEagerSingleton();
install(new DecoratedHiveMetastoreModule());
}

public static class MetastoreValidator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
*/
package io.trino.plugin.iceberg.catalog;

import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.ConnectorSession;

import java.util.Optional;

public interface IcebergTableOperationsProvider
{
IcebergTableOperations createTableOperations(
HiveMetastore hiveMetastore,
TrinoCatalog catalog,
ConnectorSession session,
String database,
String table,
Expand Down
Loading

0 comments on commit 30ab6ac

Please sign in to comment.