From 83e90c05a9a8551efd90d63aa899ea6191458d9f Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Thu, 14 Oct 2021 22:53:03 -0700 Subject: [PATCH] Add Iceberg Glue and Hadoop catalog support --- .../metastore/glue/GlueHiveMetastore.java | 2 +- .../metastore/thrift/ThriftHiveMetastore.java | 31 +- .../metastore/thrift/ThriftMetastore.java | 10 + .../metastore/thrift/ThriftMetastoreUtil.java | 2 +- plugin/trino-iceberg/pom.xml | 10 + .../plugin/iceberg/HiveTableOperations.java | 350 -------------- .../trino/plugin/iceberg/IcebergConfig.java | 15 + .../plugin/iceberg/IcebergErrorCode.java | 2 + .../trino/plugin/iceberg/IcebergMetadata.java | 3 +- .../iceberg/IcebergMetadataFactory.java | 1 + .../trino/plugin/iceberg/IcebergModule.java | 4 - .../io/trino/plugin/iceberg/IcebergUtil.java | 38 +- .../InternalIcebergConnectorFactory.java | 2 +- .../iceberg/RollbackToSnapshotProcedure.java | 2 + .../AbstractMetastoreTableOperations.java | 153 ++++++ .../catalog/AbstractTableOperations.java | 243 ++++++++++ .../IcebergCatalogModule.java} | 28 +- .../catalog/IcebergTableOperations.java | 23 + .../IcebergTableOperationsProvider.java | 28 ++ .../iceberg/{ => catalog}/TrinoCatalog.java | 3 +- .../iceberg/catalog/TrinoCatalogFactory.java | 19 + .../file/FileMetastoreTableOperations.java | 86 ++++ ...FileMetastoreTableOperationsProvider.java} | 29 +- .../IcebergFileMetastoreCatalogModule.java | 34 ++ .../catalog/glue/GlueTableOperations.java | 171 +++++++ .../glue/GlueTableOperationsProvider.java | 70 +++ .../glue/IcebergGlueCatalogModule.java | 33 ++ .../catalog/glue/TrinoGlueCatalog.java | 437 ++++++++++++++++++ .../catalog/glue/TrinoGlueCatalogFactory.java | 64 +++ .../catalog/hadoop/HadoopTableOperations.java | 2 + .../hadoop/HadoopTableOperationsProvider.java | 2 + .../hadoop/IcebergHadoopCatalogModule.java | 33 ++ .../catalog/hadoop/TrinoHadoopCatalog.java | 2 + .../hadoop/TrinoHadoopCatalogFactory.java | 2 + .../hms/HiveMetastoreTableOperations.java | 107 +++++ .../HiveMetastoreTableOperationsProvider.java | 63 +++ .../IcebergHiveMetastoreCatalogModule.java | 33 ++ .../{ => catalog/hms}/TrinoHiveCatalog.java | 30 +- .../hms/TrinoHiveCatalogFactory.java} | 44 +- .../plugin/iceberg/TestIcebergConfig.java | 7 +- .../plugin/iceberg/TestIcebergPlugin.java | 13 +- .../trino/plugin/iceberg/TestIcebergV2.java | 6 +- testing/trino-product-tests/pom.xml | 5 + .../product/iceberg/TestIcebergInsert.java | 92 ++++ 44 files changed, 1873 insertions(+), 461 deletions(-) delete mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTableOperations.java rename plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/{IcebergMetastoreModule.java => catalog/IcebergCatalogModule.java} (61%) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java rename plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/{ => catalog}/TrinoCatalog.java (98%) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java rename plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/{HiveTableOperationsProvider.java => catalog/file/FileMetastoreTableOperationsProvider.java} (57%) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/IcebergHadoopCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalog.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java rename plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/{ => catalog/hms}/TrinoHiveCatalog.java (96%) rename plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/{TrinoCatalogFactory.java => catalog/hms/TrinoHiveCatalogFactory.java} (58%) create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 4f4973e3d8ee..2d8c7f897f52 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -200,7 +200,7 @@ public GlueHiveMetastore( this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats); } - private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) + public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) { ClientConfiguration clientConfig = new ClientConfiguration() .withMaxConnections(config.getMaxGlueConnections()) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index b78123abc2f5..a607c248a9ea 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -71,9 +71,11 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -1666,7 +1668,23 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar request.addLockComponent(createLockComponentForOperation(partition.getTableName(), operation, isDynamicPartitionWrite, Optional.of(partition.getPartitionId()))); } - LockRequest lockRequest = request.build(); + acquireLock(identity, format("hive transaction %s for query %s", transactionId, queryId), request.build()); + } + + @Override + public long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName) + { + LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, dbName); + lockComponent.setTablename(tableName); + LockRequest lockRequest = new LockRequestBuilder(queryId) + .addLockComponent(lockComponent) + .setUser(identity.getUsername().get()) + .build(); + return acquireLock(identity, format("query %s", queryId), lockRequest); + } + + private long acquireLock(HiveIdentity identity, String context, LockRequest lockRequest) + { try { LockResponse response = retry() .stopOn(NoSuchTxnException.class, TxnAbortedException.class, MetaException.class) @@ -1681,10 +1699,10 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar while (response.getState() == LockState.WAITING) { if (Duration.nanosSince(waitStart).compareTo(maxWaitForLock) > 0) { // timed out - throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId))); + throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d for %s", lockId, context))); } - log.debug("Waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId); + log.debug("Waiting for lock %d for %s", lockId, context); response = retry() .stopOn(NoSuchTxnException.class, NoSuchLockException.class, TxnAbortedException.class, MetaException.class) @@ -1698,6 +1716,8 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar if (response.getState() != LockState.ACQUIRED) { throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, "Could not acquire lock. Lock in state " + response.getState())); } + + return response.getLockid(); } catch (TException e) { throw new TrinoException(HIVE_METASTORE_ERROR, e); @@ -1710,7 +1730,7 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar private T unlockSuppressing(HiveIdentity identity, long lockId, T exception) { try { - unlockTableLock(identity, lockId); + releaseTableLock(identity, lockId); } catch (RuntimeException e) { exception.addSuppressed(e); @@ -1718,7 +1738,8 @@ private T unlockSuppressing(HiveIdentity identity, long lo return exception; } - private void unlockTableLock(HiveIdentity identity, long lockId) + @Override + public void releaseTableLock(HiveIdentity identity, long lockId) { try { retry() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java index 8aaa7cedd18c..cb9aedbd49c2 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastore.java @@ -172,6 +172,16 @@ default void acquireTableWriteLock(HiveIdentity identity, String queryId, long t throw new UnsupportedOperationException(); } + default long acquireTableExclusiveLock(HiveIdentity identity, String queryId, String dbName, String tableName) + { + throw new UnsupportedOperationException(); + } + + default void releaseTableLock(HiveIdentity identity, long lockId) + { + throw new UnsupportedOperationException(); + } + default void updateTableWriteId(HiveIdentity identity, String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange) { throw new UnsupportedOperationException(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java index 9d0f6076a145..2291fb157bc7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java @@ -173,7 +173,7 @@ public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Tab return result; } - static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table) + public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table) { org.apache.hadoop.hive.metastore.api.Table result = new org.apache.hadoop.hive.metastore.api.Table(); result.setDbName(table.getDatabaseName()); diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index e11f65cdc9ad..77872eb603ed 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -84,6 +84,16 @@ units + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-glue + + com.fasterxml.jackson.core jackson-core diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java deleted file mode 100644 index 9f908ecd25a4..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; - -import io.airlift.log.Logger; -import io.trino.plugin.hive.authentication.HiveIdentity; -import io.trino.plugin.hive.metastore.Column; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.PrincipalPrivileges; -import io.trino.plugin.hive.metastore.StorageFormat; -import io.trino.plugin.hive.metastore.Table; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableMetadataParser; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.hive.HiveSchemaUtil; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.util.Tasks; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; -import static io.trino.plugin.hive.HiveType.toHiveType; -import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; -import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; -import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; -import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; -import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; -import static java.lang.Integer.parseInt; -import static java.lang.String.format; -import static java.util.Objects.requireNonNull; -import static java.util.UUID.randomUUID; -import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; -import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; -import static org.apache.iceberg.TableMetadataParser.getFileExtension; -import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION; -import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; - -@NotThreadSafe -public class HiveTableOperations - implements TableOperations -{ - private static final Logger log = Logger.get(HiveTableOperations.class); - - public static final String METADATA_LOCATION = "metadata_location"; - public static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location"; - private static final String METADATA_FOLDER_NAME = "metadata"; - - private static final StorageFormat STORAGE_FORMAT = StorageFormat.create( - LazySimpleSerDe.class.getName(), - FileInputFormat.class.getName(), - FileOutputFormat.class.getName()); - - private final HiveMetastore metastore; - private final HiveIdentity identity; - private final String database; - private final String tableName; - private final Optional owner; - private final Optional location; - private final FileIO fileIo; - - private TableMetadata currentMetadata; - private String currentMetadataLocation; - private boolean shouldRefresh = true; - private int version = -1; - - HiveTableOperations(FileIO fileIo, HiveMetastore metastore, HiveIdentity identity, String database, String table, Optional owner, Optional location) - { - this.fileIo = requireNonNull(fileIo, "fileIo is null"); - this.metastore = requireNonNull(metastore, "metastore is null"); - this.identity = requireNonNull(identity, "identity is null"); - this.database = requireNonNull(database, "database is null"); - this.tableName = requireNonNull(table, "table is null"); - this.owner = requireNonNull(owner, "owner is null"); - this.location = requireNonNull(location, "location is null"); - } - - public void initializeFromMetadata(TableMetadata tableMetadata) - { - checkState(currentMetadata == null, "already initialized"); - currentMetadata = tableMetadata; - currentMetadataLocation = tableMetadata.metadataFileLocation(); - shouldRefresh = false; - version = parseVersion(currentMetadataLocation); - } - - @Override - public TableMetadata current() - { - if (shouldRefresh) { - return refresh(); - } - return currentMetadata; - } - - @Override - public TableMetadata refresh() - { - if (location.isPresent()) { - refreshFromMetadataLocation(null); - return currentMetadata; - } - - Table table = getTable(); - - if (isPrestoView(table) && isHiveOrPrestoView(table)) { - // this is a Hive view, hence not a table - throw new TableNotFoundException(getSchemaTableName()); - } - if (!isIcebergTable(table)) { - throw new UnknownTableTypeException(getSchemaTableName()); - } - - String metadataLocation = table.getParameters().get(METADATA_LOCATION); - if (metadataLocation == null) { - throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); - } - - refreshFromMetadataLocation(metadataLocation); - - return currentMetadata; - } - - @Override - public void commit(@Nullable TableMetadata base, TableMetadata metadata) - { - requireNonNull(metadata, "metadata is null"); - - // if the metadata is already out of date, reject it - if (!Objects.equals(base, current())) { - throw new CommitFailedException("Cannot commit: stale table metadata for %s", getSchemaTableName()); - } - - // if the metadata is not changed, return early - if (Objects.equals(base, metadata)) { - return; - } - - String newMetadataLocation = writeNewMetadata(metadata, version + 1); - - // TODO: use metastore locking - - Table table; - try { - if (base == null) { - Table.Builder builder = Table.builder() - .setDatabaseName(database) - .setTableName(tableName) - .setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set"))) - .setTableType(TableType.EXTERNAL_TABLE.name()) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) - .setParameter("EXTERNAL", "TRUE") - .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) - .setParameter(METADATA_LOCATION, newMetadataLocation); - String tableComment = metadata.properties().get(TABLE_COMMENT); - if (tableComment != null) { - builder.setParameter(TABLE_COMMENT, tableComment); - } - table = builder.build(); - } - else { - Table currentTable = getTable(); - - checkState(currentMetadataLocation != null, "No current metadata location for existing table"); - String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); - if (!currentMetadataLocation.equals(metadataLocation)) { - throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", - currentMetadataLocation, metadataLocation, getSchemaTableName()); - } - - table = Table.builder(currentTable) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .setParameter(METADATA_LOCATION, newMetadataLocation) - .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) - .build(); - } - } - catch (RuntimeException e) { - try { - io().deleteFile(newMetadataLocation); - } - catch (RuntimeException ex) { - e.addSuppressed(ex); - } - throw e; - } - - PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); - if (base == null) { - metastore.createTable(identity, table, privileges); - } - else { - metastore.replaceTable(identity, database, tableName, table, privileges); - } - - shouldRefresh = true; - } - - @Override - public FileIO io() - { - return fileIo; - } - - @Override - public String metadataFileLocation(String filename) - { - TableMetadata metadata = current(); - String location; - if (metadata != null) { - String writeLocation = metadata.properties().get(WRITE_METADATA_LOCATION); - if (writeLocation != null) { - return format("%s/%s", writeLocation, filename); - } - location = metadata.location(); - } - else { - location = this.location.orElseThrow(() -> new IllegalStateException("Location not set")); - } - return format("%s/%s/%s", location, METADATA_FOLDER_NAME, filename); - } - - @Override - public LocationProvider locationProvider() - { - TableMetadata metadata = current(); - return getLocationProvider(getSchemaTableName(), metadata.location(), metadata.properties()); - } - - private Table getTable() - { - return metastore.getTable(identity, database, tableName) - .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); - } - - private SchemaTableName getSchemaTableName() - { - return new SchemaTableName(database, tableName); - } - - private String writeNewMetadata(TableMetadata metadata, int newVersion) - { - String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); - OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath); - - // write the new metadata - TableMetadataParser.write(metadata, newMetadataLocation); - - return newTableMetadataFilePath; - } - - private void refreshFromMetadataLocation(String newLocation) - { - // use null-safe equality check because new tables have a null metadata location - if (Objects.equals(currentMetadataLocation, newLocation)) { - shouldRefresh = false; - return; - } - - AtomicReference newMetadata = new AtomicReference<>(); - Tasks.foreach(newLocation) - .retry(20) - .exponentialBackoff(100, 5000, 600000, 4.0) - .run(metadataLocation -> newMetadata.set( - TableMetadataParser.read(this, io().newInputFile(metadataLocation)))); - - String newUUID = newMetadata.get().uuid(); - if (currentMetadata != null) { - checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()), - "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID); - } - - currentMetadata = newMetadata.get(); - currentMetadataLocation = newLocation; - version = parseVersion(newLocation); - shouldRefresh = false; - } - - private static String newTableMetadataFilePath(TableMetadata meta, int newVersion) - { - String codec = meta.property(METADATA_COMPRESSION, METADATA_COMPRESSION_DEFAULT); - return metadataFileLocation(meta, format("%05d-%s%s", newVersion, randomUUID(), getFileExtension(codec))); - } - - private static String metadataFileLocation(TableMetadata metadata, String filename) - { - String location = metadata.properties().get(WRITE_METADATA_LOCATION); - if (location != null) { - return format("%s/%s", location, filename); - } - return format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename); - } - - private static int parseVersion(String metadataLocation) - { - int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 - int versionEnd = metadataLocation.indexOf('-', versionStart); - try { - return parseInt(metadataLocation.substring(versionStart, versionEnd)); - } - catch (NumberFormatException | IndexOutOfBoundsException e) { - log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation); - return -1; - } - } - - private static List toHiveColumns(List columns) - { - return columns.stream() - .map(column -> new Column( - column.name(), - toHiveType(HiveSchemaUtil.convert(column.type())), - Optional.empty())) - .collect(toImmutableList()); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 079cb3595487..1ee209a5ef50 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -16,6 +16,7 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.trino.plugin.hive.HiveCompressionCodec; +import io.trino.spi.function.Description; import org.apache.iceberg.FileFormat; import javax.validation.constraints.Min; @@ -33,6 +34,7 @@ public class IcebergConfig private int maxPartitionsPerWriter = 100; private boolean uniqueTableLocation; private CatalogType catalogType = HIVE_METASTORE; + private String catalogWarehouse; public CatalogType getCatalogType() { @@ -46,6 +48,19 @@ public IcebergConfig setCatalogType(CatalogType catalogType) return this; } + @Config("iceberg.catalog.warehouse") + @Description("Iceberg default warehouse location, used to generate default table location") + public IcebergConfig setCatalogWarehouse(String warehouse) + { + this.catalogWarehouse = warehouse; + return this; + } + + public String getCatalogWarehouse() + { + return catalogWarehouse; + } + @NotNull public FileFormat getFileFormat() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java index 46dcf0acc977..f32fddcfd2fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java @@ -36,6 +36,8 @@ public enum IcebergErrorCode ICEBERG_CURSOR_ERROR(9, EXTERNAL), ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), + ICEBERG_CATALOG_ERROR(12, EXTERNAL), + ICEBERG_COMMIT_ERROR(13, EXTERNAL) /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 5b4956eeac31..d11c9184e66c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -23,6 +23,7 @@ import io.airlift.slice.Slice; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaName; import io.trino.spi.connector.CatalogSchemaTableName; @@ -100,9 +101,9 @@ import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.plugin.iceberg.TableType.DATA; -import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 47d99fc2ace5..79f129029e23 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.airlift.json.JsonCodec; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.type.TypeManager; import javax.inject.Inject; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 7ba247f1e197..cad3e8e214b2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -66,7 +66,6 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); - binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); @@ -74,9 +73,6 @@ public void configure(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); - // TODO inject table operations based on IcebergConfig.getCatalogType - binder.bind(HiveTableOperationsProvider.class).in(Scopes.SINGLETON); - binder.bind(IcebergFileWriterFactory.class).in(Scopes.SINGLETON); newExporter(binder).export(IcebergFileWriterFactory.class).withGeneratedName(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 555bbb062eae..dcb0d9fd1fd2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -16,9 +16,10 @@ import com.google.common.collect.ImmutableMap; import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; -import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -104,11 +105,14 @@ import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; -final class IcebergUtil +public final class IcebergUtil { private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*"); @@ -119,13 +123,10 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table) return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP)); } - public static Table loadIcebergTable(HiveMetastore metastore, HiveTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) + public static Table loadIcebergTable(IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) { TableOperations operations = tableOperationsProvider.createTableOperations( - metastore, - new HdfsContext(session), - session.getQueryId(), - new HiveIdentity(session), + session, table.getSchemaName(), table.getTableName(), Optional.empty(), @@ -135,16 +136,13 @@ public static Table loadIcebergTable(HiveMetastore metastore, HiveTableOperation public static Table getIcebergTableWithMetadata( HiveMetastore metastore, - HiveTableOperationsProvider tableOperationsProvider, + IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table, TableMetadata tableMetadata) { - HiveTableOperations operations = (HiveTableOperations) tableOperationsProvider.createTableOperations( - metastore, - new HdfsContext(session), - session.getQueryId(), - new HiveIdentity(session), + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + session, table.getSchemaName(), table.getTableName(), Optional.empty(), @@ -224,7 +222,7 @@ public static Optional getTableComment(Table table) return Optional.ofNullable(table.properties().get(TABLE_COMMENT)); } - private static String quotedTableName(SchemaTableName name) + public static String quotedTableName(SchemaTableName name) { return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); } @@ -388,4 +386,14 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.build()); } + + public static void validateTableCanBeDropped(Table table, SchemaTableName schemaTableName) + { + // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 + if (table.properties().containsKey(OBJECT_STORE_PATH) || + table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || + table.properties().containsKey(WRITE_METADATA_LOCATION)) { + throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino"); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java index dfe0a6329b70..d1fce63e67d7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java @@ -77,7 +77,7 @@ public static Connector createConnector( new ConnectorObjectNameGeneratorModule(catalogName, "io.trino.plugin.iceberg", "trino.plugin.iceberg"), new JsonModule(), new IcebergModule(), - new IcebergMetastoreModule(metastore), + new IcebergCatalogModule(metastore), new HiveHdfsModule(), new HiveS3Module(), new HiveGcsModule(), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java index 6bd8fe306428..bab13b82c09b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java @@ -14,6 +14,8 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.procedure.Procedure; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java new file mode 100644 index 000000000000..89a1b7e87413 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.StorageFormat; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.types.Types.NestedField; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.HiveType.toHiveType; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +@NotThreadSafe +public abstract class AbstractMetastoreTableOperations + extends AbstractTableOperations +{ + protected static final StorageFormat STORAGE_FORMAT = StorageFormat.create( + LazySimpleSerDe.class.getName(), + FileInputFormat.class.getName(), + FileOutputFormat.class.getName()); + + protected final HiveMetastore metastore; + + protected AbstractMetastoreTableOperations( + FileIO fileIo, + HiveMetastore metastore, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.metastore = requireNonNull(metastore, "metastore is null"); + } + + @Override + protected String getRefreshedLocation() + { + Table table = getTable(); + + if (isPrestoView(table) && isHiveOrPrestoView(table)) { + // this is a Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } + if (!isIcebergTable(table)) { + throw new UnknownTableTypeException(getSchemaTableName()); + } + + String metadataLocation = table.getParameters().get(METADATA_LOCATION); + if (metadataLocation == null) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); + } + return metadataLocation; + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + + Table table; + try { + Table.Builder builder = Table.builder() + .setDatabaseName(database) + .setTableName(tableName) + .setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set"))) + .setTableType(TableType.EXTERNAL_TABLE.name()) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) + .setParameter("EXTERNAL", "TRUE") + .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) + .setParameter(METADATA_LOCATION, newMetadataLocation); + String tableComment = metadata.properties().get(TABLE_COMMENT); + if (tableComment != null) { + builder.setParameter(TABLE_COMMENT, tableComment); + } + table = builder.build(); + } + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException ex) { + e.addSuppressed(ex); + } + throw e; + } + + PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); + HiveIdentity identity = new HiveIdentity(session); + metastore.createTable(identity, table, privileges); + } + + protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata); + + protected Table getTable() + { + return metastore.getTable(new HiveIdentity(session), database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); + } + + protected static List toHiveColumns(List columns) + { + return columns.stream() + .map(column -> new Column( + column.name(), + toHiveType(HiveSchemaUtil.convert(column.type())), + Optional.empty())) + .collect(toImmutableList()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTableOperations.java new file mode 100644 index 000000000000..24692f95a58c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTableOperations.java @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import io.airlift.log.Logger; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.Tasks; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; +import static java.lang.Integer.parseInt; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.apache.iceberg.TableMetadataParser.getFileExtension; +import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION; +import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; + +public abstract class AbstractTableOperations + implements IcebergTableOperations +{ + private static final Logger log = Logger.get(AbstractTableOperations.class); + + public static final String METADATA_LOCATION = "metadata_location"; + public static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location"; + protected static final String METADATA_FOLDER_NAME = "metadata"; + + protected final ConnectorSession session; + protected final String database; + protected final String tableName; + protected final Optional owner; + protected final Optional location; + protected final FileIO fileIo; + + protected TableMetadata currentMetadata; + protected String currentMetadataLocation; + protected boolean shouldRefresh = true; + protected int version = -1; + + protected AbstractTableOperations( + FileIO fileIo, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + this.fileIo = requireNonNull(fileIo, "fileIo is null"); + this.session = requireNonNull(session, "session is null"); + this.database = requireNonNull(database, "database is null"); + this.tableName = requireNonNull(table, "table is null"); + this.owner = requireNonNull(owner, "owner is null"); + this.location = requireNonNull(location, "location is null"); + } + + @Override + public void initializeFromMetadata(TableMetadata tableMetadata) + { + checkState(currentMetadata == null, "already initialized"); + currentMetadata = tableMetadata; + currentMetadataLocation = tableMetadata.metadataFileLocation(); + shouldRefresh = false; + version = parseVersion(currentMetadataLocation); + } + + @Override + public TableMetadata current() + { + if (shouldRefresh) { + return refresh(); + } + return currentMetadata; + } + + @Override + public TableMetadata refresh() + { + if (location.isPresent()) { + refreshFromMetadataLocation(null); + return currentMetadata; + } + refreshFromMetadataLocation(getRefreshedLocation()); + return currentMetadata; + } + + protected abstract String getRefreshedLocation(); + + @Override + public void commit(@Nullable TableMetadata base, TableMetadata metadata) + { + requireNonNull(metadata, "metadata is null"); + + // if the metadata is already out of date, reject it + if (!Objects.equals(base, current())) { + throw new CommitFailedException("Cannot commit: stale table metadata for %s", getSchemaTableName()); + } + + // if the metadata is not changed, return early + if (Objects.equals(base, metadata)) { + return; + } + + if (base == null) { + commitNewTable(metadata); + } + else { + commitToExistingTable(base, metadata); + } + + shouldRefresh = true; + } + + protected abstract void commitNewTable(TableMetadata metadata); + + protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata); + + @Override + public FileIO io() + { + return fileIo; + } + + @Override + public String metadataFileLocation(String filename) + { + TableMetadata metadata = current(); + String location; + if (metadata != null) { + String writeLocation = metadata.properties().get(WRITE_METADATA_LOCATION); + if (writeLocation != null) { + return format("%s/%s", writeLocation, filename); + } + location = metadata.location(); + } + else { + location = this.location.orElseThrow(() -> new IllegalStateException("Location not set")); + } + return format("%s/%s/%s", location, METADATA_FOLDER_NAME, filename); + } + + @Override + public LocationProvider locationProvider() + { + TableMetadata metadata = current(); + return getLocationProvider(getSchemaTableName(), metadata.location(), metadata.properties()); + } + + protected SchemaTableName getSchemaTableName() + { + return new SchemaTableName(database, tableName); + } + + protected String writeNewMetadata(TableMetadata metadata, int newVersion) + { + String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion); + OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath); + + // write the new metadata + TableMetadataParser.write(metadata, newMetadataLocation); + + return newTableMetadataFilePath; + } + + protected void refreshFromMetadataLocation(String newLocation) + { + // use null-safe equality check because new tables have a null metadata location + if (Objects.equals(currentMetadataLocation, newLocation)) { + shouldRefresh = false; + return; + } + + AtomicReference newMetadata = new AtomicReference<>(); + Tasks.foreach(newLocation) + .retry(20) + .exponentialBackoff(100, 5000, 600000, 4.0) + .run(metadataLocation -> newMetadata.set( + TableMetadataParser.read(this, io().newInputFile(metadataLocation)))); + + String newUUID = newMetadata.get().uuid(); + if (currentMetadata != null) { + checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()), + "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID); + } + + currentMetadata = newMetadata.get(); + currentMetadataLocation = newLocation; + version = parseVersion(newLocation); + shouldRefresh = false; + } + + protected static String newTableMetadataFilePath(TableMetadata meta, int newVersion) + { + String codec = meta.property(METADATA_COMPRESSION, METADATA_COMPRESSION_DEFAULT); + return metadataFileLocation(meta, format("%05d-%s%s", newVersion, randomUUID(), getFileExtension(codec))); + } + + protected static String metadataFileLocation(TableMetadata metadata, String filename) + { + String location = metadata.properties().get(WRITE_METADATA_LOCATION); + if (location != null) { + return format("%s/%s", location, filename); + } + return format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename); + } + + protected static int parseVersion(String metadataLocation) + { + int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 + int versionEnd = metadataLocation.indexOf('-', versionStart); + try { + return parseInt(metadataLocation.substring(versionStart, versionEnd)); + } + catch (NumberFormatException | IndexOutOfBoundsException e) { + log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation); + return -1; + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetastoreModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java similarity index 61% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetastoreModule.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 443080ce1ca4..0fa049d1b0ff 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetastoreModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -15,29 +15,38 @@ import com.google.inject.Binder; import com.google.inject.Module; +import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreModule; import io.trino.plugin.hive.metastore.cache.ForCachingHiveMetastore; -import io.trino.plugin.hive.metastore.file.FileMetastoreModule; -import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; +import io.trino.plugin.iceberg.catalog.hadoop.IcebergHadoopCatalogModule; +import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; import javax.inject.Inject; import java.util.Optional; import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.trino.plugin.iceberg.CatalogType.GLUE; +import static io.trino.plugin.iceberg.CatalogType.HADOOP; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static java.util.Objects.requireNonNull; -public class IcebergMetastoreModule +public class IcebergCatalogModule extends AbstractConfigurationAwareModule { private final Optional metastore; - public IcebergMetastoreModule(Optional metastore) + public IcebergCatalogModule(Optional metastore) { this.metastore = requireNonNull(metastore, "metastore is null"); } @@ -48,11 +57,14 @@ protected void setup(Binder binder) if (metastore.isPresent()) { binder.bind(HiveMetastore.class).annotatedWith(ForCachingHiveMetastore.class).toInstance(metastore.get()); install(new CachingHiveMetastoreModule()); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); } else { - bindMetastoreModule(HIVE_METASTORE, new ThriftMetastoreModule()); - bindMetastoreModule(TESTING_FILE_METASTORE, new FileMetastoreModule()); - // TODO add support for Glue metastore + bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); + bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); + bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); + bindCatalogModule(HADOOP, new IcebergHadoopCatalogModule()); } binder.bind(MetastoreValidator.class).asEagerSingleton(); @@ -69,7 +81,7 @@ public MetastoreValidator(HiveMetastore metastore) } } - private void bindMetastoreModule(CatalogType catalogType, Module module) + private void bindCatalogModule(CatalogType catalogType, Module module) { install(conditionalModule( IcebergConfig.class, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperations.java new file mode 100644 index 000000000000..d084d9d2e543 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperations.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; + +public interface IcebergTableOperations + extends TableOperations +{ + void initializeFromMetadata(TableMetadata tableMetadata); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java new file mode 100644 index 000000000000..0fe022206a8c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import io.trino.spi.connector.ConnectorSession; + +import java.util.Optional; + +public interface IcebergTableOperationsProvider +{ + IcebergTableOperations createTableOperations( + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java similarity index 98% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index f3364e3912fd..1830594f0dca 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -11,8 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog; +import io.trino.plugin.iceberg.UnknownTableTypeException; import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java new file mode 100644 index 000000000000..1b4760228347 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +public interface TrinoCatalogFactory +{ + TrinoCatalog create(); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java new file mode 100644 index 000000000000..36c4d9559eb8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.file; + +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations; +import io.trino.spi.connector.ConnectorSession; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileIO; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; + +@NotThreadSafe +public class FileMetastoreTableOperations + extends AbstractMetastoreTableOperations +{ + public FileMetastoreTableOperations( + FileIO fileIo, + HiveMetastore metastore, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, metastore, session, database, table, owner, location); + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + + Table table; + try { + Table currentTable = getTable(); + + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", + currentMetadataLocation, metadataLocation, getSchemaTableName()); + } + + table = Table.builder(currentTable) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .setParameter(METADATA_LOCATION, newMetadataLocation) + .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) + .build(); + } + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException ex) { + e.addSuppressed(ex); + } + throw e; + } + + PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); + HiveIdentity identity = new HiveIdentity(session); + metastore.replaceTable(identity, database, tableName, table, privileges); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java similarity index 57% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperationsProvider.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java index 545fc563e327..739470488b75 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java @@ -11,12 +11,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog.file; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.plugin.hive.metastore.HiveMetastore; -import org.apache.iceberg.TableOperations; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.connector.ConnectorSession; import javax.inject.Inject; @@ -24,30 +26,31 @@ import static java.util.Objects.requireNonNull; -public class HiveTableOperationsProvider +public class FileMetastoreTableOperationsProvider + implements IcebergTableOperationsProvider { + private final HiveMetastore hiveMetastore; private final FileIoProvider fileIoProvider; @Inject - public HiveTableOperationsProvider(FileIoProvider fileIoProvider) + public FileMetastoreTableOperationsProvider(HiveMetastore hiveMetastore, FileIoProvider fileIoProvider) { + this.hiveMetastore = requireNonNull(hiveMetastore, "hiveMetastore is null"); this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); } - public TableOperations createTableOperations( - HiveMetastore hiveMetastore, - HdfsContext hdfsContext, - String queryId, - HiveIdentity identity, + @Override + public IcebergTableOperations createTableOperations( + ConnectorSession session, String database, String table, Optional owner, Optional location) { - return new HiveTableOperations( - fileIoProvider.createFileIo(hdfsContext, queryId), + return new FileMetastoreTableOperations( + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), hiveMetastore, - identity, + session, database, table, owner, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java new file mode 100644 index 000000000000..96d744723cb5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.file; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.file.FileMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; + +public class IcebergFileMetastoreCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new FileMetastoreModule()); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java new file mode 100644 index 000000000000..eeb55a20fec0 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java @@ -0,0 +1,171 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.Table; +import com.amazonaws.services.glue.model.TableInput; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.plugin.iceberg.catalog.AbstractTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; + +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +public class GlueTableOperations + extends AbstractTableOperations +{ + private static final Logger log = Logger.get(GlueTableOperations.class); + + private final AWSGlueAsync glueClient; + private final GlueMetastoreStats stats; + private final String catalogId; + + public GlueTableOperations( + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + String catalogId, + FileIO fileIo, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.glueClient = glueClient; + this.stats = stats; + this.catalogId = catalogId; + } + + @Override + protected String getRefreshedLocation() + { + return stats.getGetTable().call(() -> { + Table table = glueClient.getTable(new GetTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(database) + .withName(tableName)).getTable(); + + if (isPrestoView(table) && isHiveOrPrestoView(table)) { + // this is a Presto Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } + if (!isIcebergTable(table)) { + throw new UnknownTableTypeException(getSchemaTableName()); + } + + String metadataLocation = table.getParameters().get(METADATA_LOCATION); + if (metadataLocation == null) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); + } + return metadataLocation; + }); + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + Map parameters = ImmutableMap.of( + TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + METADATA_LOCATION, newMetadataLocation); + TableInput tableInput = new TableInput() + .withName(tableName) + .withTableType(EXTERNAL_TABLE.name()) + .withOwner(owner.orElse(null)) + .withParameters(parameters); + + boolean succeeded = false; + try { + stats.getCreateTable().call(() -> { + glueClient.createTable(new CreateTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(database) + .withTableInput(tableInput)); + return null; + }); + succeeded = true; + } + catch (ConcurrentModificationException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s because Glue detected concurrent update", getSchemaTableName()), e); + } + catch (AlreadyExistsException e) { + throw new TableAlreadyExistsException(getSchemaTableName()); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s due to unexpected exception", getSchemaTableName()), e); + } + finally { + cleanupMetadataLocation(!succeeded, newMetadataLocation); + } + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + throw new TrinoException(NOT_SUPPORTED, "Table update is not supported by Trino Glue catalog"); + } + + private boolean isPrestoView(Table table) + { + return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG)); + } + + private boolean isHiveOrPrestoView(Table table) + { + return table.getTableType().equals(TableType.VIRTUAL_VIEW.name()); + } + + private boolean isIcebergTable(Table table) + { + return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP)); + } + + private void cleanupMetadataLocation(boolean shouldCleanup, String metadataLocation) + { + if (shouldCleanup) { + try { + io().deleteFile(metadataLocation); + } + catch (RuntimeException ex) { + log.error("Fail to cleanup metadata file at {}", metadataLocation, ex); + throw ex; + } + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java new file mode 100644 index 000000000000..fb581e505617 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.connector.ConnectorSession; + +import javax.inject.Inject; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class GlueTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final AWSGlueAsync glueClient; + private final String catalogId; + private final FileIoProvider fileIoProvider; + private final GlueMetastoreStats stats = new GlueMetastoreStats(); + + @Inject + public GlueTableOperationsProvider( + GlueHiveMetastoreConfig glueConfig, + FileIoProvider fileIoProvider) + { + this.fileIoProvider = fileIoProvider; + requireNonNull(glueConfig, "glueConfig is null"); + this.glueClient = createAsyncGlueClient(glueConfig, Optional.empty(), stats.newRequestMetricsCollector()); + this.catalogId = glueConfig.getCatalogId().orElse(null); + } + + @Override + public IcebergTableOperations createTableOperations( + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new GlueTableOperations( + glueClient, + stats, + catalogId, + fileIoProvider.createFileIo(new HdfsEnvironment.HdfsContext(session), session.getQueryId()), + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java new file mode 100644 index 000000000000..6ffa45e469f5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +public class IcebergGlueCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new GlueMetastoreModule()); + binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(GlueTableOperationsProvider.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java new file mode 100644 index 000000000000..6685882c08e9 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -0,0 +1,437 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.CreateDatabaseRequest; +import com.amazonaws.services.glue.model.Database; +import com.amazonaws.services.glue.model.DatabaseInput; +import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.DeleteTableRequest; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetDatabaseRequest; +import com.amazonaws.services.glue.model.GetDatabasesRequest; +import com.amazonaws.services.glue.model.GetDatabasesResult; +import com.amazonaws.services.glue.model.GetTablesRequest; +import com.amazonaws.services.glue.model.GetTablesResult; +import com.amazonaws.services.glue.model.UpdateDatabaseRequest; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.airlift.log.Logger; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toMap; +import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.Transactions.createTableTransaction; + +public class TrinoGlueCatalog + implements TrinoCatalog +{ + private static final Logger log = Logger.get(TrinoGlueCatalog.class); + + private final HdfsEnvironment hdfsEnvironment; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final String warehouse; + private final boolean isUniqueTableLocation; + private final AWSGlueAsync glueClient; + private final String catalogId; + private final GlueMetastoreStats stats; + + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoGlueCatalog( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + String catalogId, + String warehouse, + boolean isUniqueTableLocation) + { + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.catalogId = catalogId; + this.warehouse = warehouse; + this.isUniqueTableLocation = isUniqueTableLocation; + } + + @Override + public List listNamespaces(ConnectorSession session) + { + try { + return stats.getGetAllDatabases().call(() -> { + List namespaces = new ArrayList<>(); + String nextToken = null; + + do { + GetDatabasesResult result = glueClient.getDatabases(new GetDatabasesRequest().withCatalogId(catalogId).withNextToken(nextToken)); + nextToken = result.getNextToken(); + result.getDatabaseList().forEach(database -> namespaces.add(database.getName())); + } + while (nextToken != null); + + return namespaces; + }); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public boolean dropNamespace(ConnectorSession session, String namespace) + { + try { + stats.getDropDatabase().call(() -> glueClient.deleteDatabase(new DeleteDatabaseRequest() + .withCatalogId(catalogId).withName(namespace))); + return true; + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + return stats.getGetDatabase().call(() -> + glueClient.getDatabase(new GetDatabaseRequest().withCatalogId(catalogId).withName(namespace)) + .getDatabase().getParameters().entrySet().stream() + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + throw new TrinoException(NOT_SUPPORTED, "get namespace principal is not supported by Glue"); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + try { + stats.getCreateDatabase().call(() -> glueClient.createDatabase(new CreateDatabaseRequest() + .withCatalogId(catalogId) + .withDatabaseInput(new DatabaseInput() + .withName(namespace) + .withParameters(properties.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> e.getValue().toString())))))); + } + catch (AlreadyExistsException e) { + throw new SchemaAlreadyExistsException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "set namespace principal is not supported by Glue"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + try { + Database sourceDb = stats.getGetDatabase().call(() -> + glueClient.getDatabase(new GetDatabaseRequest().withCatalogId(catalogId).withName(source))).getDatabase(); + + DatabaseInput targetInput = new DatabaseInput() + .withName(target) + .withParameters(sourceDb.getParameters()) + .withLocationUri(sourceDb.getLocationUri()) + .withDescription(sourceDb.getDescription()); + + stats.getRenameDatabase().call(() -> glueClient.updateDatabase(new UpdateDatabaseRequest() + .withCatalogId(catalogId).withName(target).withDatabaseInput(targetInput))); + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(source); + } + catch (AmazonServiceException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, e); + } + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + try { + return stats.getGetAllTables().call(() -> { + List namespaces = namespace.isPresent() ? Lists.newArrayList(namespace.get()) : listNamespaces(session); + + List tableNames = new ArrayList<>(); + String nextToken = null; + + for (String ns : namespaces) { + do { + GetTablesResult result = glueClient.getTables(new GetTablesRequest() + .withCatalogId(catalogId) + .withDatabaseName(ns) + .withNextToken(nextToken)); + result.getTableList().stream() + .map(com.amazonaws.services.glue.model.Table::getName) + .forEach(name -> tableNames.add(new SchemaTableName(ns, name))); + nextToken = result.getNextToken(); + } + while (nextToken != null); + } + return tableNames; + }); + } + catch (EntityNotFoundException e) { + // database does not exist + return ImmutableList.of(); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName table) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + table, + ignore -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + operations.initializeFromMetadata(metadata); + return new BaseTable(operations, quotedTableName(table)); + } + + @Override + public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData) + { + Table table = loadTable(session, schemaTableName); + validateTableCanBeDropped(table, schemaTableName); + try { + stats.getDropTable().call(() -> + glueClient.deleteTable(new DeleteTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(schemaTableName.getSchemaName()) + .withName(schemaTableName.getTableName()))); + } + catch (AmazonServiceException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, e); + } + + Path tableLocation = new Path(table.location()); + if (purgeData) { + try { + hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), tableLocation).delete(tableLocation, true); + } + catch (Exception e) { + // don't fail if unable to delete path + log.warn(e, "Failed to delete path: " + tableLocation); + } + } + return true; + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, + String location, Map properties) + { + TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); + TableOperations ops = tableOperationsProvider.createTableOperations( + session, + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + Optional.of(session.getUser()), + Optional.of(location)); + return createTableTransaction(schemaTableName.toString(), ops, metadata); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + throw new TrinoException(NOT_SUPPORTED, "renameTable is not supported by Trino Glue catalog"); + } + + @Override + public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateTableComment is not supported by Trino Glue catalog"); + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + String dbLocation = stats.getGetDatabase().call(() -> glueClient.getDatabase(new GetDatabaseRequest() + .withCatalogId(catalogId).withName(schemaTableName.getSchemaName())) + .getDatabase().getLocationUri()); + + String location; + if (dbLocation == null) { + if (warehouse == null) { + throw new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location cannot be determined, " + + "please either set 'location' when creating the database, or set 'iceberg.catalog.warehouse' " + + "to allow a default location at '/.db'", schemaTableName.getSchemaName())); + } + location = format("%s/%s.db/%s", warehouse, schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } + else { + location = format("%s/%s", dbLocation, schemaTableName.getTableName()); + } + + if (isUniqueTableLocation) { + location = location + "-" + randomUUID().toString().replace("-", ""); + } + return location; + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported by Trino Glue catalog"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported by Trino Glue catalog"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported by Trino Glue catalog"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported by Trino Glue catalog"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported by Trino Glue catalog"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + throw new TrinoException(NOT_SUPPORTED, "listViews is not supported by Trino Glue catalog"); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + throw new TrinoException(NOT_SUPPORTED, "getViews is not supported by Trino Glue catalog"); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + throw new TrinoException(NOT_SUPPORTED, "getView is not supported by Trino Glue catalog"); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + throw new TrinoException(NOT_SUPPORTED, "listMaterializedViews is not supported by Trino Glue catalog"); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, + boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported by Trino Glue catalog"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported by Trino Glue catalog"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "getMaterializedView is not supported by Trino Glue catalog"); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported by Trino Glue catalog"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java new file mode 100644 index 000000000000..0c138aea13e1 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import javax.inject.Inject; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class TrinoGlueCatalogFactory + implements TrinoCatalogFactory +{ + private final HdfsEnvironment hdfsEnvironment; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final String warehouse; + private final boolean isUniqueTableLocation; + private final AWSGlueAsync glueClient; + private final String catalogId; + private final GlueMetastoreStats stats = new GlueMetastoreStats(); + + @Inject + public TrinoGlueCatalogFactory( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + GlueHiveMetastoreConfig glueConfig, + IcebergConfig icebergConfig) + { + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + requireNonNull(glueConfig, "glueConfig is null"); + this.glueClient = createAsyncGlueClient(glueConfig, Optional.empty(), stats.newRequestMetricsCollector()); + this.catalogId = glueConfig.getCatalogId().orElse(null); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.warehouse = icebergConfig.getCatalogWarehouse(); + this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + public TrinoCatalog create() + { + return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, glueClient, stats, catalogId, warehouse, isUniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperations.java new file mode 100644 index 000000000000..30db9d2eb70c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperations.java @@ -0,0 +1,2 @@ +package io.trino.plugin.iceberg.catalog.hadoop;public class HadoopTableOperations { +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperationsProvider.java new file mode 100644 index 000000000000..4aea36c125d8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/HadoopTableOperationsProvider.java @@ -0,0 +1,2 @@ +package io.trino.plugin.iceberg.catalog.hadoop;public class HadoopTableOperationsProvider { +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/IcebergHadoopCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/IcebergHadoopCatalogModule.java new file mode 100644 index 000000000000..6ffa45e469f5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/IcebergHadoopCatalogModule.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +public class IcebergGlueCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new GlueMetastoreModule()); + binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(GlueTableOperationsProvider.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalog.java new file mode 100644 index 000000000000..4040973e681b --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalog.java @@ -0,0 +1,2 @@ +package io.trino.plugin.iceberg.catalog.hadoop;public class TrinoHadoopCatalog { +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalogFactory.java new file mode 100644 index 000000000000..fb0c4bcc5c93 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hadoop/TrinoHadoopCatalogFactory.java @@ -0,0 +1,2 @@ +package io.trino.plugin.iceberg.catalog.hadoop;public class TrinoHadoopCatalogFactory { +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java new file mode 100644 index 000000000000..2bb7d8b244a5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; +import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileIO; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable; +import static java.util.Objects.requireNonNull; + +@NotThreadSafe +public class HiveMetastoreTableOperations + extends AbstractMetastoreTableOperations +{ + private final ThriftMetastore thriftMetastore; + + public HiveMetastoreTableOperations( + FileIO fileIo, + HiveMetastore metastore, + ThriftMetastore thriftMetastore, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, metastore, session, database, table, owner, location); + this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null"); + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + HiveIdentity identity = new HiveIdentity(session); + + long lockId = thriftMetastore.acquireTableExclusiveLock( + identity, + session.getQueryId(), + database, + tableName); + try { + Table table; + try { + Table currentTable = fromMetastoreApiTable(thriftMetastore.getTable(identity, database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName()))); + + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", + currentMetadataLocation, metadataLocation, getSchemaTableName()); + } + + table = Table.builder(currentTable) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .setParameter(METADATA_LOCATION, newMetadataLocation) + .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) + .build(); + } + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException ex) { + e.addSuppressed(ex); + } + throw e; + } + + PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); + metastore.replaceTable(identity, database, tableName, table, privileges); + } + finally { + thriftMetastore.releaseTableLock(identity, lockId); + } + + shouldRefresh = true; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java new file mode 100644 index 000000000000..6d12638d621e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.connector.ConnectorSession; + +import javax.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class HiveMetastoreTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final HiveMetastore hiveMetastore; + private final FileIoProvider fileIoProvider; + private final ThriftMetastore thriftMetastore; + + @Inject + public HiveMetastoreTableOperationsProvider(HiveMetastore hiveMetastore, FileIoProvider fileIoProvider, ThriftMetastore thriftMetastore) + { + this.hiveMetastore = requireNonNull(hiveMetastore, "hiveMetastore is null"); + this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null"); + } + + @Override + public IcebergTableOperations createTableOperations( + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new HiveMetastoreTableOperations( + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), + hiveMetastore, + thriftMetastore, + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java new file mode 100644 index 000000000000..e1dd90d7c729 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +public class IcebergHiveMetastoreCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new ThriftMetastoreModule()); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java similarity index 96% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 4c3b489fdb17..667e91157fa0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -39,7 +39,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog.hms; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -60,6 +60,10 @@ import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; +import io.trino.plugin.iceberg.IcebergUtil; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -115,6 +119,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; @@ -128,12 +133,9 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; -import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.Transactions.createTableTransaction; -class TrinoHiveCatalog +public class TrinoHiveCatalog implements TrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); @@ -152,7 +154,7 @@ class TrinoHiveCatalog private final HiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; - private final HiveTableOperationsProvider tableOperationsProvider; + private final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; private final boolean useUniqueTableLocation; @@ -164,7 +166,7 @@ public TrinoHiveCatalog( HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - HiveTableOperationsProvider tableOperationsProvider, + IcebergTableOperationsProvider tableOperationsProvider, String trinoVersion, boolean useUniqueTableLocation) { @@ -260,10 +262,7 @@ public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTab { TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); TableOperations ops = tableOperationsProvider.createTableOperations( - metastore, - new HdfsContext(session), - session.getQueryId(), - new HiveIdentity(session), + session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), Optional.of(session.getUser()), @@ -295,13 +294,8 @@ public List listTables(ConnectorSession session, Optional ((BaseTable) loadIcebergTable(metastore, tableOperationsProvider, session, schemaTableName)).operations().current()); + ignore -> ((BaseTable) loadIcebergTable(tableOperationsProvider, session, schemaTableName)).operations().current()); return getIcebergTableWithMetadata(metastore, tableOperationsProvider, session, schemaTableName, metadata); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java similarity index 58% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java index ca5b39413938..999549bb4b57 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java @@ -11,54 +11,42 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog.hms; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.spi.TrinoException; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.type.TypeManager; import javax.inject.Inject; import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -public class TrinoCatalogFactory +public class TrinoHiveCatalogFactory + implements TrinoCatalogFactory { private final CatalogName catalogName; private final HiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; - private final HiveTableOperationsProvider tableOperationsProvider; + private final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; - private final CatalogType catalogType; private final boolean isUniqueTableLocation; @Inject - public TrinoCatalogFactory( + public TrinoHiveCatalogFactory( IcebergConfig config, CatalogName catalogName, HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - HiveTableOperationsProvider tableOperationsProvider, + IcebergTableOperationsProvider tableOperationsProvider, NodeVersion nodeVersion) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); @@ -68,20 +56,12 @@ public TrinoCatalogFactory( this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationProvider is null"); this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString(); requireNonNull(config, "config is null"); - this.catalogType = config.getCatalogType(); this.isUniqueTableLocation = config.isUniqueTableLocation(); } public TrinoCatalog create() { - switch (catalogType) { - case TESTING_FILE_METASTORE: - case HIVE_METASTORE: - return new TrinoHiveCatalog(catalogName, memoizeMetastore(metastore, 1000), hdfsEnvironment, typeManager, tableOperationsProvider, trinoVersion, isUniqueTableLocation); - case GLUE: - // TODO not supported yet - throw new TrinoException(NOT_SUPPORTED, "Unknown Trino Iceberg catalog type"); - } - throw new TrinoException(NOT_SUPPORTED, "Unsupported Trino Iceberg catalog type " + catalogType); + return new TrinoHiveCatalog(catalogName, memoizeMetastore(metastore, 1000), hdfsEnvironment, typeManager, tableOperationsProvider, + trinoVersion, isUniqueTableLocation); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 292347ba5bc6..a801cfeb25de 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -39,7 +39,8 @@ public void testDefaults() .setUseFileSizeFromMetadata(true) .setMaxPartitionsPerWriter(100) .setUniqueTableLocation(false) - .setCatalogType(HIVE_METASTORE)); + .setCatalogType(HIVE_METASTORE) + .setCatalogWarehouse(null)); } @Test @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings() .put("iceberg.max-partitions-per-writer", "222") .put("iceberg.unique-table-location", "true") .put("iceberg.catalog.type", "GLUE") + .put("iceberg.catalog.warehouse", "/tmp") .build(); IcebergConfig expected = new IcebergConfig() @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings() .setUseFileSizeFromMetadata(false) .setMaxPartitionsPerWriter(222) .setUniqueTableLocation(true) - .setCatalogType(GLUE); + .setCatalogType(GLUE) + .setCatalogWarehouse("/tmp"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 6237de8b4ba0..e2d207ce9a23 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -65,11 +65,11 @@ public void testGlueMetastore() { ConnectorFactory factory = getConnectorFactory(); - assertThatThrownBy(() -> factory.create( + factory.create( "test", Map.of("iceberg.catalog.type", "glue"), - new TestingConnectorContext())) - .hasMessageContaining("Explicit bindings are required and HiveMetastore is not explicitly bound"); + new TestingConnectorContext()) + .shutdown(); assertThatThrownBy(() -> factory.create( "test", @@ -96,15 +96,14 @@ public void testRecordingMetastore() .shutdown(); // recording with glue - assertThatThrownBy(() -> factory.create( + factory.create( "test", Map.of( "iceberg.catalog.type", "glue", "hive.metastore.glue.region", "us-east-2", "hive.metastore-recording-path", "/tmp"), - new TestingConnectorContext())) - .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used") - .hasMessageContaining("Configuration property 'hive.metastore.glue.region' was not used"); + new TestingConnectorContext()) + .shutdown(); } private static ConnectorFactory getConnectorFactory() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 0f38167e2f96..0e5f627fbe8a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -23,6 +23,8 @@ import io.trino.plugin.hive.HiveHdfsConfiguration; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.spi.connector.SchemaTableName; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; @@ -176,8 +178,8 @@ private void writeEqualityDeleteToNationTable(Table icebergTable) private Table updateTableToV2(String tableName) { - HiveTableOperationsProvider tableOperationsProvider = new HiveTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); - BaseTable table = (BaseTable) loadIcebergTable(metastore, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); + IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(metastore, new HdfsFileIoProvider(hdfsEnvironment)); + BaseTable table = (BaseTable) loadIcebergTable(tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); TableOperations operations = table.operations(); TableMetadata currentMetadata = operations.current(); diff --git a/testing/trino-product-tests/pom.xml b/testing/trino-product-tests/pom.xml index d40bce87cd29..cf9cc7c45ce3 100644 --- a/testing/trino-product-tests/pom.xml +++ b/testing/trino-product-tests/pom.xml @@ -76,6 +76,11 @@ tempto-runner + + io.airlift + concurrent + + io.airlift http-client diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java new file mode 100644 index 000000000000..f0128572d35f --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.iceberg; + +import io.airlift.concurrent.MoreFutures; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryExecutionException; +import io.trino.tempto.query.QueryExecutor; +import org.assertj.core.api.Assertions; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tests.product.TestGroups.HMS_ONLY; +import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.STORAGE_FORMATS_DETAILED; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; + +public class TestIcebergInsert + extends ProductTest +{ + /** + * @see TestIcebergCreateTable#testCreateTable() See TestIcebergCreateTable for a non-concurrent INSERT test coverage. + */ + @Test(groups = {ICEBERG, STORAGE_FORMATS_DETAILED, HMS_ONLY}, timeOut = 60_000) + public void testIcebergConcurrentInsert() + throws Exception + { + int threads = 3; + int insertsPerThread = 7; + + String tableName = "iceberg.default.test_insert_concurrent_" + randomTableSuffix(); + onTrino().executeQuery("CREATE TABLE " + tableName + "(a bigint)"); + + ExecutorService executor = Executors.newFixedThreadPool(threads); + CyclicBarrier barrier = new CyclicBarrier(threads); + QueryExecutor onTrino = onTrino(); + List allInserted = executor.invokeAll( + IntStream.range(0, threads) + .mapToObj(thread -> (Callable>) () -> { + List inserted = new ArrayList<>(); + for (int i = 0; i < insertsPerThread; i++) { + barrier.await(); + long value = i + (long) insertsPerThread * thread; + try { + onTrino.executeQuery("INSERT INTO " + tableName + " VALUES " + value); + } + catch (QueryExecutionException queryExecutionException) { + // failed to insert + continue; + } + inserted.add(value); + } + return inserted; + }) + .collect(toImmutableList())).stream() + .map(MoreFutures::getDone) + .flatMap(List::stream) + .collect(toImmutableList()); + + // At least one INSERT per round should succeed + Assertions.assertThat(allInserted).hasSizeBetween(insertsPerThread, threads * insertsPerThread); + + assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)) + .containsOnly(allInserted.stream() + .map(QueryAssert.Row::row) + .toArray(QueryAssert.Row[]::new)); + + onTrino().executeQuery("DROP TABLE " + tableName); + } +}