Skip to content

Commit

Permalink
Improve performance when listing columns in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamyamin committed Dec 19, 2024
1 parent a99d96e commit 211f97a
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,14 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -228,6 +232,7 @@
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION;
Expand Down Expand Up @@ -406,7 +411,8 @@ public class IcebergMetadata
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService executor;
private final ExecutorService systemTableExecutor;
private final Executor metadataFetchingExecutor;

private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -423,7 +429,8 @@ public IcebergMetadata(
Optional<HiveMetastoreFactory> metastoreFactory,
boolean addFilesProcedureEnabled,
Predicate<String> allowedExtraProperties,
ExecutorService executor)
ExecutorService systemTableExecutor,
Executor metadataFetchingExecutor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -434,7 +441,8 @@ public IcebergMetadata(
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.addFilesProcedureEnabled = addFilesProcedureEnabled;
this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null");
this.executor = requireNonNull(executor, "executor is null");
this.systemTableExecutor = requireNonNull(systemTableExecutor, "systemTableExecutor is null");
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
}

@Override
Expand Down Expand Up @@ -694,14 +702,14 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
return switch (tableType) {
case DATA, MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected table type: " + tableType); // Handled above.
case HISTORY -> Optional.of(new HistoryTable(tableName, table));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, executor));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, executor));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, executor));
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table, systemTableExecutor));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table, systemTableExecutor));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), systemTableExecutor));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, systemTableExecutor));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), executor));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), systemTableExecutor));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table, executor));
case REFS -> Optional.of(new RefsTable(tableName, table, systemTableExecutor));
};
}

Expand Down Expand Up @@ -981,23 +989,40 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
});

for (SchemaTableName tableName : remainingTables) {
try {
Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema(), typeManager);
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
}
List<Callable<TableColumnsMetadata>> tasks = remainingTables.stream()
.map(tableName -> (Callable<TableColumnsMetadata>) () -> {
try {
Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema(), typeManager);
return TableColumnsMetadata.forTable(tableName, columns);
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
return null;
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
return null;
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
return null;
}
})
.collect(toImmutableList());

try {
List<TableColumnsMetadata> taskResults = processWithAdditionalThreads(tasks, metadataFetchingExecutor).stream()
.filter(Objects::nonNull) // Filter out null results
.collect(toImmutableList());

tableMetadatas.addAll(taskResults);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}

return tableMetadatas.build();
})
.flatMap(List::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
Expand All @@ -25,9 +26,11 @@
import io.trino.spi.type.TypeManager;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

public class IcebergMetadataFactory
Expand All @@ -41,7 +44,8 @@ public class IcebergMetadataFactory
private final Optional<HiveMetastoreFactory> metastoreFactory;
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService executor;
private final ExecutorService systemTableExecutor;
private final Executor metadataFetchingExecutor;

@Inject
public IcebergMetadataFactory(
Expand All @@ -52,7 +56,8 @@ public IcebergMetadataFactory(
IcebergFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter,
@RawHiveMetastoreFactory Optional<HiveMetastoreFactory> metastoreFactory,
@ForIcebergScanPlanning ExecutorService executor,
@ForIcebergScanPlanning ExecutorService systemTableExecutor,
@ForIcebergMetadata ExecutorService metadataExecutorService,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -62,14 +67,21 @@ public IcebergMetadataFactory(
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
this.executor = requireNonNull(executor, "executor is null");
this.systemTableExecutor = requireNonNull(systemTableExecutor, "systemTableExecutor is null");
this.addFilesProcedureEnabled = config.isAddFilesProcedureEnabled();
if (config.getAllowedExtraProperties().equals(ImmutableList.of("*"))) {
this.allowedExtraProperties = _ -> true;
}
else {
this.allowedExtraProperties = ImmutableSet.copyOf(requireNonNull(config.getAllowedExtraProperties(), "allowedExtraProperties is null"))::contains;
}

if (config.getMetadataParallelism() == 1) {
this.metadataFetchingExecutor = directExecutor();
}
else {
this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism());
}
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -84,6 +96,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
metastoreFactory,
addFilesProcedureEnabled,
allowedExtraProperties,
executor);
systemTableExecutor,
metadataFetchingExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Optional;
import java.util.UUID;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.TABLE;
Expand Down Expand Up @@ -122,7 +123,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void testNonLowercaseGlueDatabase()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
Expand Down Expand Up @@ -191,7 +192,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.OTHER_VIEW;
Expand Down Expand Up @@ -129,7 +130,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_ACCESS_KEY;
Expand Down Expand Up @@ -225,7 +226,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down

0 comments on commit 211f97a

Please sign in to comment.