Skip to content

Commit

Permalink
Add support for object store file layout in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelong95 authored and ebyhr committed Nov 22, 2024
1 parent bb190a0 commit cd1965d
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 34 deletions.
11 changes: 11 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ implementation is used:
- Set to `false` to disable in-memory caching of metadata files on the
coordinator. This cache is not used when `fs.cache.enabled` is set to true.
- `true`
* - `iceberg.object-store.enabled`
- Set to `true` to enable Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout).
Enabling the object store file layout appends a deterministic hash directly
after the data write path.
- `false`
* - `iceberg.expire-snapshots.min-retention`
- Minimal retention period for the
[`expire_snapshot` command](iceberg-expire-snapshots).
Expand Down Expand Up @@ -807,6 +812,8 @@ The following table properties can be updated after a table is created:
- `format_version`
- `partitioning`
- `sorted_by`
- `object_store_enabled`
- `data_location`

For example, to update a table from v1 of the Iceberg specification to v2:

Expand Down Expand Up @@ -869,6 +876,10 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Comma-separated list of columns to use for Parquet bloom filter. It improves
the performance of queries using Equality and IN predicates when reading
Parquet files. Requires Parquet format. Defaults to `[]`.
* - `object_store_enabled`
- Whether Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout) is enabled.
* - `data_location`
- Optionally specifies the file system location URI for the table's data files
* - `extra_properties`
- Additional properties added to a Iceberg table. The properties are not used by Trino,
and are available in the `$properties` metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class IcebergConfig
private List<String> allowedExtraProperties = ImmutableList.of();
private boolean incrementalRefreshEnabled = true;
private boolean metadataCacheEnabled = true;
private boolean objectStoreEnabled;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -519,4 +520,17 @@ public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled)
this.metadataCacheEnabled = metadataCacheEnabled;
return this;
}

public boolean isObjectStoreEnabled()
{
return objectStoreEnabled;
}

@Config("iceberg.object-store.enabled")
@ConfigDescription("Enable the Iceberg object store file layout")
public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled)
{
this.objectStoreEnabled = objectStoreEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,11 @@
import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName;
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
Expand Down Expand Up @@ -357,6 +359,8 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
import static org.apache.iceberg.types.TypeUtil.indexParents;
Expand All @@ -376,6 +380,8 @@ public class IcebergMetadata
.add(EXTRA_PROPERTIES_PROPERTY)
.add(FILE_FORMAT_PROPERTY)
.add(FORMAT_VERSION_PROPERTY)
.add(OBJECT_STORE_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(SORTED_BY_PROPERTY)
.build();
Expand Down Expand Up @@ -2153,6 +2159,24 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
}

if (properties.containsKey(OBJECT_STORE_ENABLED_PROPERTY)) {
boolean objectStoreEnabled = (boolean) properties.get(OBJECT_STORE_ENABLED_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The object_store_enabled property cannot be empty"));
updateProperties.set(OBJECT_STORE_ENABLED, Boolean.toString(objectStoreEnabled));
}

if (properties.containsKey(DATA_LOCATION_PROPERTY)) {
String dataLocation = (String) properties.get(DATA_LOCATION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The data_location property cannot be empty"));
boolean objectStoreEnabled = (boolean) properties.getOrDefault(
OBJECT_STORE_ENABLED_PROPERTY,
Optional.of(Boolean.parseBoolean(icebergTable.properties().get(OBJECT_STORE_ENABLED)))).orElseThrow();
if (!objectStoreEnabled) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store is enabled");
}
updateProperties.set(WRITE_DATA_LOCATION, dataLocation);
}

try {
updateProperties.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -58,6 +59,8 @@ public class IcebergTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";

public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
Expand All @@ -68,6 +71,8 @@ public class IcebergTableProperties
.add(FORMAT_VERSION_PROPERTY)
.add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
.add(OBJECT_STORE_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(EXTRA_PROPERTIES_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.build();
Expand Down Expand Up @@ -175,6 +180,16 @@ public IcebergTableProperties(
.collect(toImmutableMap(entry -> entry.getKey().toLowerCase(ENGLISH), Map.Entry::getValue));
},
value -> value))
.add(booleanProperty(
OBJECT_STORE_ENABLED_PROPERTY,
"Set to true to enable Iceberg object store file layout",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION_PROPERTY,
"File system location URI for the table's data files",
null,
false))
.build();

checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()
Expand Down Expand Up @@ -249,6 +264,16 @@ public static List<String> getParquetBloomFilterColumns(Map<String, Object> tabl
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
}

public static boolean getObjectStoreEnabled(Map<String, Object> tableProperties)
{
return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY);
}

public static Optional<String> getDataLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY));
}

public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
Expand Down Expand Up @@ -159,6 +161,7 @@
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static java.lang.Boolean.parseBoolean;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.parseFloat;
Expand All @@ -173,13 +176,11 @@
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
Expand Down Expand Up @@ -334,6 +335,13 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
properties.put(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY, ImmutableList.copyOf(parquetBloomFilterColumns));
}

if (parseBoolean(icebergTable.properties().getOrDefault(OBJECT_STORE_ENABLED, "false"))) {
properties.put(OBJECT_STORE_ENABLED_PROPERTY, true);
}

Optional<String> dataLocation = Optional.ofNullable(icebergTable.properties().get(WRITE_DATA_LOCATION));
dataLocation.ifPresent(location -> properties.put(DATA_LOCATION_PROPERTY, location));

return properties.buildOrThrow();
}

Expand Down Expand Up @@ -842,6 +850,18 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));

boolean objectStoreEnabled = IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties());
if (objectStoreEnabled) {
propertiesBuilder.put(OBJECT_STORE_ENABLED, "true");
}
Optional<String> dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties());
dataLocation.ifPresent(location -> {
if (!objectStoreEnabled) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store is enabled");
}
propertiesBuilder.put(WRITE_DATA_LOCATION, location);
});

// iceberg ORC format bloom filter properties used by create table
List<String> orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
if (!orcBloomFilterColumns.isEmpty()) {
Expand Down Expand Up @@ -965,17 +985,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
.snapshotId();
}

public static void validateTableCanBeDropped(Table table)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name());
}
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput;
Expand Down Expand Up @@ -674,7 +673,6 @@ private Optional<List<ColumnMetadata>> getCachedColumnMetadata(com.amazonaws.ser
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
try {
deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
Expand Down Expand Up @@ -385,7 +384,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
TableMetadata metadata = table.operations().current();
validateTableCanBeDropped(table);

io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -319,7 +318,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -232,8 +231,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
loadTable(session, schemaTableName);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
// The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data.
// Nessie GC tool can be used to clean up the expired data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8536,6 +8536,34 @@ public void testSetIllegalExtraPropertyKey()
}
}

@Test
public void testObjectStoreEnabledAndDataLocation()
throws Exception
{
String tableName = "test_object_store_enabled_data_location" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (object_store_enabled = true, data_location = 'local:///data-location/xyz') AS SELECT 1 AS val", 1);

Location tableLocation = Location.of(getTableLocation(tableName));
assertThat(fileSystem.directoryExists(tableLocation).get()).isTrue();

String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\"");
Location dataFileLocation = Location.of(filePath);
assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isTrue();
assertThat(filePath).matches("local:///data-location/xyz/.{6}/tpch/%s.*".formatted(tableName));

assertUpdate("DROP TABLE " + tableName);
assertThat(fileSystem.newInputFile(dataFileLocation).exists()).isFalse();
assertThat(fileSystem.newInputFile(tableLocation).exists()).isFalse();
}

@Test
public void testCreateTableWithDataLocationButObjectStoreDisabled()
{
assertQueryFails(
"CREATE TABLE test_data_location WITH (data_location = 'local:///data-location/xyz') AS SELECT 1 AS val",
"Data location can only be set when object store is enabled");
}

@Override
protected Optional<SetColumnTypeSetup> filterSetColumnTypesDataProvider(SetColumnTypeSetup setup)
{
Expand Down
Loading

0 comments on commit cd1965d

Please sign in to comment.