From cd1965dc1aa88c8e42ba018673db3533d29c10a5 Mon Sep 17 00:00:00 2001 From: Jake Long Date: Mon, 29 Jan 2024 17:34:18 -0800 Subject: [PATCH] Add support for object store file layout in Iceberg --- docs/src/main/sphinx/connector/iceberg.md | 11 +++ .../trino/plugin/iceberg/IcebergConfig.java | 14 ++++ .../trino/plugin/iceberg/IcebergMetadata.java | 24 ++++++ .../iceberg/IcebergTableProperties.java | 25 ++++++ .../io/trino/plugin/iceberg/IcebergUtil.java | 35 +++++---- .../catalog/glue/TrinoGlueCatalog.java | 2 - .../iceberg/catalog/hms/TrinoHiveCatalog.java | 2 - .../catalog/jdbc/TrinoJdbcCatalog.java | 2 - .../catalog/nessie/TrinoNessieCatalog.java | 4 +- .../iceberg/BaseIcebergConnectorTest.java | 28 +++++++ .../plugin/iceberg/TestIcebergConfig.java | 9 ++- .../TestIcebergTableWithObjectStore.java | 77 +++++++++++++++++++ .../trino/plugin/iceberg/TestIcebergV2.java | 40 ++++++++++ .../TestIcebergSparkCompatibility.java | 60 ++++++++++++--- 14 files changed, 299 insertions(+), 34 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 56f560c5a13..bee3447f80e 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -198,6 +198,11 @@ implementation is used: - Set to `false` to disable in-memory caching of metadata files on the coordinator. This cache is not used when `fs.cache.enabled` is set to true. - `true` +* - `iceberg.object-store.enabled` + - Set to `true` to enable Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout). + Enabling the object store file layout appends a deterministic hash directly + after the data write path. + - `false` * - `iceberg.expire-snapshots.min-retention` - Minimal retention period for the [`expire_snapshot` command](iceberg-expire-snapshots). @@ -807,6 +812,8 @@ The following table properties can be updated after a table is created: - `format_version` - `partitioning` - `sorted_by` +- `object_store_enabled` +- `data_location` For example, to update a table from v1 of the Iceberg specification to v2: @@ -869,6 +876,10 @@ connector using a {doc}`WITH ` clause. - Comma-separated list of columns to use for Parquet bloom filter. It improves the performance of queries using Equality and IN predicates when reading Parquet files. Requires Parquet format. Defaults to `[]`. +* - `object_store_enabled` + - Whether Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout) is enabled. +* - `data_location` + - Optionally specifies the file system location URI for the table's data files * - `extra_properties` - Additional properties added to a Iceberg table. The properties are not used by Trino, and are available in the `$properties` metadata table. diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index af71315d2e8..4d22822e8fa 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -91,6 +91,7 @@ public class IcebergConfig private List allowedExtraProperties = ImmutableList.of(); private boolean incrementalRefreshEnabled = true; private boolean metadataCacheEnabled = true; + private boolean objectStoreEnabled; public CatalogType getCatalogType() { @@ -519,4 +520,17 @@ public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled) this.metadataCacheEnabled = metadataCacheEnabled; return this; } + + public boolean isObjectStoreEnabled() + { + return objectStoreEnabled; + } + + @Config("iceberg.object-store.enabled") + @ConfigDescription("Enable the Iceberg object store file layout") + public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled) + { + this.objectStoreEnabled = objectStoreEnabled; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index b2b22536aaa..318421fe469 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -267,9 +267,11 @@ import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName; import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; +import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; @@ -357,6 +359,8 @@ import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; import static org.apache.iceberg.expressions.Expressions.alwaysTrue; import static org.apache.iceberg.types.TypeUtil.indexParents; @@ -376,6 +380,8 @@ public class IcebergMetadata .add(EXTRA_PROPERTIES_PROPERTY) .add(FILE_FORMAT_PROPERTY) .add(FORMAT_VERSION_PROPERTY) + .add(OBJECT_STORE_ENABLED_PROPERTY) + .add(DATA_LOCATION_PROPERTY) .add(PARTITIONING_PROPERTY) .add(SORTED_BY_PROPERTY) .build(); @@ -2153,6 +2159,24 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion)); } + if (properties.containsKey(OBJECT_STORE_ENABLED_PROPERTY)) { + boolean objectStoreEnabled = (boolean) properties.get(OBJECT_STORE_ENABLED_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The object_store_enabled property cannot be empty")); + updateProperties.set(OBJECT_STORE_ENABLED, Boolean.toString(objectStoreEnabled)); + } + + if (properties.containsKey(DATA_LOCATION_PROPERTY)) { + String dataLocation = (String) properties.get(DATA_LOCATION_PROPERTY) + .orElseThrow(() -> new IllegalArgumentException("The data_location property cannot be empty")); + boolean objectStoreEnabled = (boolean) properties.getOrDefault( + OBJECT_STORE_ENABLED_PROPERTY, + Optional.of(Boolean.parseBoolean(icebergTable.properties().get(OBJECT_STORE_ENABLED)))).orElseThrow(); + if (!objectStoreEnabled) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store is enabled"); + } + updateProperties.set(WRITE_DATA_LOCATION, dataLocation); + } + try { updateProperties.commit(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java index ca967be9652..bdee60139ed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java @@ -35,6 +35,7 @@ import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX; import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.doubleProperty; import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; @@ -58,6 +59,8 @@ public class IcebergTableProperties public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns"; public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp"; public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns"; + public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled"; + public static final String DATA_LOCATION_PROPERTY = "data_location"; public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties"; public static final Set SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -68,6 +71,8 @@ public class IcebergTableProperties .add(FORMAT_VERSION_PROPERTY) .add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY) .add(ORC_BLOOM_FILTER_FPP_PROPERTY) + .add(OBJECT_STORE_ENABLED_PROPERTY) + .add(DATA_LOCATION_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -175,6 +180,16 @@ public IcebergTableProperties( .collect(toImmutableMap(entry -> entry.getKey().toLowerCase(ENGLISH), Map.Entry::getValue)); }, value -> value)) + .add(booleanProperty( + OBJECT_STORE_ENABLED_PROPERTY, + "Set to true to enable Iceberg object store file layout", + icebergConfig.isObjectStoreEnabled(), + false)) + .add(stringProperty( + DATA_LOCATION_PROPERTY, + "File system location URI for the table's data files", + null, + false)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -249,6 +264,16 @@ public static List getParquetBloomFilterColumns(Map tabl return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns); } + public static boolean getObjectStoreEnabled(Map tableProperties) + { + return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY); + } + + public static Optional getDataLocation(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index e9d74edc35f..90b39fa120b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -119,9 +119,11 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; +import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY; @@ -159,6 +161,7 @@ import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid; +import static java.lang.Boolean.parseBoolean; import static java.lang.Double.parseDouble; import static java.lang.Float.floatToRawIntBits; import static java.lang.Float.parseFloat; @@ -173,13 +176,11 @@ import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; @@ -334,6 +335,13 @@ public static Map getIcebergTableProperties(Table icebergTable) properties.put(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY, ImmutableList.copyOf(parquetBloomFilterColumns)); } + if (parseBoolean(icebergTable.properties().getOrDefault(OBJECT_STORE_ENABLED, "false"))) { + properties.put(OBJECT_STORE_ENABLED_PROPERTY, true); + } + + Optional dataLocation = Optional.ofNullable(icebergTable.properties().get(WRITE_DATA_LOCATION)); + dataLocation.ifPresent(location -> properties.put(DATA_LOCATION_PROPERTY, location)); + return properties.buildOrThrow(); } @@ -842,6 +850,18 @@ public static Map createTableProperties(ConnectorTableMetadata t propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString()); propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties()))); + boolean objectStoreEnabled = IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties()); + if (objectStoreEnabled) { + propertiesBuilder.put(OBJECT_STORE_ENABLED, "true"); + } + Optional dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties()); + dataLocation.ifPresent(location -> { + if (!objectStoreEnabled) { + throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store is enabled"); + } + propertiesBuilder.put(WRITE_DATA_LOCATION, location); + }); + // iceberg ORC format bloom filter properties used by create table List orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties()); if (!orcBloomFilterColumns.isEmpty()) { @@ -965,17 +985,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis) .snapshotId(); } - public static void validateTableCanBeDropped(Table table) - { - // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 - if (table.properties().containsKey(OBJECT_STORE_PATH) || - table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility - table.properties().containsKey(WRITE_METADATA_LOCATION) || - table.properties().containsKey(WRITE_DATA_LOCATION)) { - throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name()); - } - } - private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName) { if (actualStorageFormat != expectedStorageFormat) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index a0a998ac75d..9d39fa32eb6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -149,7 +149,6 @@ import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput; @@ -674,7 +673,6 @@ private Optional> getCachedColumnMetadata(com.amazonaws.ser public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); try { deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 31fd010bfda..45e749fa529 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -103,7 +103,6 @@ import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT; @@ -385,7 +384,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); TableMetadata metadata = table.operations().current(); - validateTableCanBeDropped(table); io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index e9658218234..1d8daf1effd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -81,7 +81,6 @@ import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -319,7 +318,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName) public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); jdbcCatalog.dropTable(toIdentifier(schemaTableName), false); try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 4f42fe33ed5..240e41f4cdd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -64,7 +64,6 @@ import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.SchemaTableName.schemaTableName; @@ -232,8 +231,7 @@ public Map> tryGetColumnMetadata(Connector @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { - BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); + loadTable(session, schemaTableName); nessieClient.dropTable(toIdentifier(schemaTableName), true); // The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data. // Nessie GC tool can be used to clean up the expired data. diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 0cd6b86c664..4382ac78969 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -8536,6 +8536,34 @@ public void testSetIllegalExtraPropertyKey() } } + @Test + public void testObjectStoreEnabledAndDataLocation() + throws Exception + { + String tableName = "test_object_store_enabled_data_location" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (object_store_enabled = true, data_location = 'local:///data-location/xyz') AS SELECT 1 AS val", 1); + + Location tableLocation = Location.of(getTableLocation(tableName)); + assertThat(fileSystem.directoryExists(tableLocation).get()).isTrue(); + + String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\""); + Location dataFileLocation = Location.of(filePath); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isTrue(); + assertThat(filePath).matches("local:///data-location/xyz/.{6}/tpch/%s.*".formatted(tableName)); + + assertUpdate("DROP TABLE " + tableName); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isFalse(); + assertThat(fileSystem.newInputFile(tableLocation).exists()).isFalse(); + } + + @Test + public void testCreateTableWithDataLocationButObjectStoreDisabled() + { + assertQueryFails( + "CREATE TABLE test_data_location WITH (data_location = 'local:///data-location/xyz') AS SELECT 1 AS val", + "Data location can only be set when object store is enabled"); + } + @Override protected Optional filterSetColumnTypesDataProvider(SetColumnTypeSetup setup) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 1ec49403515..3ff0ce4588b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -74,7 +74,9 @@ public void testDefaults() .setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2) .setAllowedExtraProperties(ImmutableList.of()) .setIncrementalRefreshEnabled(true) - .setMetadataCacheEnabled(true)); + .setMetadataCacheEnabled(true) + .setIncrementalRefreshEnabled(true) + .setObjectStoreEnabled(false)); } @Test @@ -111,6 +113,7 @@ public void testExplicitPropertyMappings() .put("iceberg.allowed-extra-properties", "propX,propY") .put("iceberg.incremental-refresh-enabled", "false") .put("iceberg.metadata-cache.enabled", "false") + .put("iceberg.object-store.enabled", "true") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -143,7 +146,9 @@ public void testExplicitPropertyMappings() .setSplitManagerThreads(42) .setAllowedExtraProperties(ImmutableList.of("propX", "propY")) .setIncrementalRefreshEnabled(false) - .setMetadataCacheEnabled(false); + .setMetadataCacheEnabled(false) + .setIncrementalRefreshEnabled(false) + .setObjectStoreEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java new file mode 100644 index 00000000000..f2fd81e58a6 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.Table; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static org.assertj.core.api.Assertions.assertThat; + +final class TestIcebergTableWithObjectStore + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystem fileSystem; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .addIcebergProperty("iceberg.object-store.enabled", "true") + .build(); + + metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector() + .getInstance(HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + + fileSystem = getFileSystemFactory(queryRunner).create(SESSION); + + return queryRunner; + } + + @Test + void testCreateTableWithDataLocation() + throws Exception + { + assertQuerySucceeds("CREATE TABLE test_create_table_with_different_location WITH (data_location = 'local:///table-location/abc') AS SELECT 1 AS val"); + Table table = metastore.getTable("tpch", "test_create_table_with_different_location").orElseThrow(); + assertThat(table.getTableType()).isEqualTo(EXTERNAL_TABLE.name()); + + Location tableLocation = Location.of(table.getStorage().getLocation()); + assertThat(fileSystem.newInputFile(tableLocation).exists()).isTrue(); + + String filePath = (String) computeScalar("SELECT file_path FROM \"test_create_table_with_different_location$files\""); + Location dataFileLocation = Location.of(filePath); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isTrue(); + assertThat(filePath).matches("local:///table-location/abc/.{6}/tpch/test_create_table_with_different_location-.*/.*\\.parquet"); + + assertQuerySucceeds("DROP TABLE test_create_table_with_different_location"); + assertThat(metastore.getTable("tpch", "test_create_table_with_different_location")).isEmpty(); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isFalse(); + assertThat(fileSystem.newInputFile(tableLocation).exists()).isFalse(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index c1827f8298c..0a1b689abfa 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -182,6 +182,46 @@ public void testDefaultFormatVersion() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testSetPropertiesObjectStoreEnabled() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_object_store", "(x int) WITH (object_store_enabled = false)")) { + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .doesNotContain("object_store_enabled"); + assertThat(loadTable(table.getName()).properties()) + .doesNotContainKey("write.object-storage.enabled"); + + assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES object_store_enabled = true"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .contains("object_store_enabled = true"); + assertThat(loadTable(table.getName()).properties()) + .containsEntry("write.object-storage.enabled", "true"); + } + } + + @Test + public void testSetPropertiesDataLocation() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_data_location", "(x int)")) { + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .doesNotContain("data_location ="); + assertThat(loadTable(table.getName()).properties()) + .doesNotContainKey("write.data.path"); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " SET PROPERTIES data_location = 'local:///data-location'", + "Data location can only be set when object store is enabled"); + + assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES object_store_enabled = true, data_location = 'local:///data-location'"); + assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName())) + .contains("object_store_enabled = true") + .contains("data_location = 'local:///data-location'"); + assertThat(loadTable(table.getName()).properties()) + .containsEntry("write.object-storage.enabled", "true") + .containsEntry("write.data.path", "local:///data-location"); + } + } + @Test public void testV2TableRead() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index afa3660059c..925f3df24be 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -708,6 +708,8 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag " doc_id STRING)\n" + " USING ICEBERG TBLPROPERTIES (" + " 'write.format.default'='%s'," + + " 'write.object-storage.enabled'=true," + + " 'write.data.path'='local:///write-data-path'," + " 'format-version' = %s," + " 'custom.table-property' = 'my_custom_value')", sparkTableName, @@ -719,6 +721,8 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag .contains( row("custom.table-property", "my_custom_value"), row("write.format.default", storageFormat.name()), + row("write.object-storage.enabled", "true"), + row("write.data.path", "local:///write-data-path"), row("owner", "hive")); onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); } @@ -730,9 +734,19 @@ public void testSparkReadingTrinoIcebergTablePropertiesData() String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); - onTrino().executeQuery("CREATE TABLE " + trinoTableName + " (doc_id VARCHAR) WITH (extra_properties = MAP(ARRAY['custom.table-property'], ARRAY['my_custom_value']))"); - - assertThat(onSpark().executeQuery("SHOW TBLPROPERTIES " + sparkTableName)).contains(row("custom.table-property", "my_custom_value")); + onTrino().executeQuery( + "CREATE TABLE " + trinoTableName + " (doc_id VARCHAR)\n" + + " WITH (" + + " object_store_enabled = true," + + " data_location = 'local:///write-data-path'," + + " extra_properties = MAP(ARRAY['custom.table-property'], ARRAY['my_custom_value'])" + + " )"); + + assertThat(onSpark().executeQuery("SHOW TBLPROPERTIES " + sparkTableName)) + .contains( + row("custom.table-property", "my_custom_value"), + row("write.object-storage.enabled", "true"), + row("write.data.path", "local:///write-data-path")); onTrino().executeQuery("DROP TABLE " + trinoTableName); } @@ -1150,10 +1164,7 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); assertThat(((String) queryResult.getOnlyValue())).contains(dataPath); - // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 - assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) - .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); - onSpark().executeQuery("DROP TABLE " + sparkTableName); + onTrino().executeQuery("DROP TABLE " + trinoTableName); } @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") @@ -1179,9 +1190,38 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); assertThat(((String) queryResult.getOnlyValue())).contains(dataPath); - assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) - .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); - onSpark().executeQuery("DROP TABLE " + sparkTableName); + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") + public void testSparkReadingTrinoObjectStorage(StorageFormat storageFormat, int specVersion) + { + String baseTableName = toLowerCase("test_trino_object_storage_location_provider_" + storageFormat); + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_trino_object_storage_location_provider/obj-data"; + + onTrino().executeQuery(format( + "CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (" + + "object_store_enabled = true," + + "data_location = '%s'," + + "format = '%s'," + + "format_version = %s)", + trinoTableName, + dataPath, + storageFormat, + specVersion)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName)); + + Row result = row("a_string", 1000000000000000L); + assertThat(onSpark().executeQuery("SELECT _string, _bigint FROM " + sparkTableName)).containsOnly(result); + assertThat(onTrino().executeQuery("SELECT _string, _bigint FROM " + trinoTableName)).containsOnly(result); + + QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\""))); + assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); + assertThat(((String) queryResult.getOnlyValue())).contains(dataPath); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); } private static final List SPECIAL_CHARACTER_VALUES = ImmutableList.of(