Skip to content

Commit

Permalink
Limit session state during metadata queries in Iceberg
Browse files Browse the repository at this point in the history
Metadata queries such as `information_schema.columns`,
`system.jdbc.columns` or `system.metadata.table_comments` may end up
loading arbitrary number of relations within single query (transaction).
It is important to bound memory usage for such queries.

In case of Iceberg Hive metastore based catalog, this is already done in
`TrinoHiveCatalogFactory` bu means of configuring per-query
`CachingHiveMetastore`. However, catalogs with explicit caching need
something similar.
  • Loading branch information
findepi committed Nov 16, 2023
1 parent 4f3d778 commit 8d4f26b
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 78 deletions.
7 changes: 7 additions & 0 deletions lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.cache;

import com.google.common.cache.Cache;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;

import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,6 +28,11 @@ public final class CacheUtils
{
private CacheUtils() {}

/**
* @throws UncheckedExecutionException when {@code loader} throws an unchecked exception
* @throws ExecutionError when {@code loader} throws an {@link Error}
* @throws RuntimeException when{@code loader} throws a checked exception (which should not happen)
*/
public static <K, V> V uncheckedCacheGet(Cache<K, V> cache, K key, Supplier<V> loader)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
Expand All @@ -105,6 +104,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -180,9 +180,16 @@ public class TrinoGlueCatalog
// Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables.
.maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE))
.build();
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final Map<SchemaTableName, ConnectorViewDefinition> viewCache = new ConcurrentHashMap<>();
private final Map<SchemaTableName, MaterializedViewData> materializedViewCache = new ConcurrentHashMap<>();

private final Cache<SchemaTableName, TableMetadata> tableMetadataCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();
private final Cache<SchemaTableName, ConnectorViewDefinition> viewCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();
private final Cache<SchemaTableName, MaterializedViewData> materializedViewCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();

public TrinoGlueCatalog(
CatalogName catalogName,
Expand Down Expand Up @@ -558,22 +565,30 @@ private void getCommentsFromIcebergMetadata(
@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
{
if (viewCache.containsKey(table) || materializedViewCache.containsKey(table)) {
if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) {
throw new TableNotFoundException(table);
}

TableMetadata metadata = tableMetadataCache.computeIfAbsent(
table,
ignore -> {
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
table.getSchemaName(),
table.getTableName(),
Optional.empty(),
Optional.empty());
return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current();
});
TableMetadata metadata;
try {
metadata = uncheckedCacheGet(
tableMetadataCache,
table,
() -> {
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
table.getSchemaName(),
table.getTableName(),
Optional.empty(),
Optional.empty());
return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current();
});
}
catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
throw e;
}

return getIcebergTableWithMetadata(
this,
Expand Down Expand Up @@ -612,7 +627,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector

private Optional<List<ColumnMetadata>> getCachedColumnMetadata(SchemaTableName tableName)
{
if (!cacheTableMetadata || viewCache.containsKey(tableName) || materializedViewCache.containsKey(tableName)) {
if (!cacheTableMetadata || viewCache.asMap().containsKey(tableName) || materializedViewCache.asMap().containsKey(tableName)) {
return Optional.empty();
}

Expand Down Expand Up @@ -828,37 +843,41 @@ private Optional<com.amazonaws.services.glue.model.Table> getTableAndCacheMetada

String tableType = getTableType(table);
Map<String, String> parameters = getTableParameters(table);
if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) {
if (isIcebergTable(parameters) && !tableMetadataCache.asMap().containsKey(schemaTableName)) {
if (viewCache.asMap().containsKey(schemaTableName) || materializedViewCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view");
}

String metadataLocation = parameters.get(METADATA_LOCATION_PROP);
try {
// Cache the TableMetadata while we have the Table retrieved anyway
tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation));
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
}
}
else if (isTrinoMaterializedView(tableType, parameters)) {
if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view");
}

try {
ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table);
materializedViewCache.put(schemaTableName, new MaterializedViewData(
materializedView,
Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP))));
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(materializedViewCache, schemaTableName, () -> {
ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table);
return new MaterializedViewData(
materializedView,
Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)));
});
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName);
}
}
else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTableName)) {
if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(schemaTableName)) {
if (materializedViewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table");
}

Expand All @@ -868,7 +887,10 @@ else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTabl
tableType,
parameters,
Optional.ofNullable(table.getOwner()))
.ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition));
.ifPresent(viewDefinition -> {
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(viewCache, schemaTableName, () -> viewDefinition);
});
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache view from %s", schemaTableName);
Expand Down Expand Up @@ -957,7 +979,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT
try {
com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
viewCache.remove(source);
viewCache.invalidate(source);
TableInput viewTableInput = getViewTableInput(
target.getTableName(),
existingView.getViewOriginalText(),
Expand Down Expand Up @@ -996,7 +1018,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName)
}

try {
viewCache.remove(schemaViewName);
viewCache.invalidate(schemaViewName);
deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName());
}
catch (AmazonServiceException e) {
Expand Down Expand Up @@ -1031,12 +1053,12 @@ public List<SchemaTableName> listViews(ConnectorSession session, Optional<String
@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
ConnectorViewDefinition cachedView = viewCache.get(viewName);
ConnectorViewDefinition cachedView = viewCache.getIfPresent(viewName);
if (cachedView != null) {
return Optional.of(cachedView);
}

if (tableMetadataCache.containsKey(viewName) || materializedViewCache.containsKey(viewName)) {
if (tableMetadataCache.asMap().containsKey(viewName) || materializedViewCache.asMap().containsKey(viewName)) {
// Entries in these caches are not views
return Optional.empty();
}
Expand Down Expand Up @@ -1257,7 +1279,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName());
}
materializedViewCache.remove(viewName);
materializedViewCache.invalidate(viewName);
dropStorageTable(session, view);
deleteTable(view.getDatabaseName(), view.getName());
}
Expand All @@ -1281,12 +1303,12 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g
@Override
protected Optional<ConnectorMaterializedViewDefinition> doGetMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
MaterializedViewData materializedViewData = materializedViewCache.get(viewName);
MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName);
if (materializedViewData != null) {
return Optional.of(materializedViewData.connectorMaterializedViewDefinition);
}

if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) {
if (tableMetadataCache.asMap().containsKey(viewName) || viewCache.asMap().containsKey(viewName)) {
// Entries in these caches are not materialized views.
return Optional.empty();
}
Expand Down Expand Up @@ -1380,7 +1402,7 @@ private ConnectorMaterializedViewDefinition createMaterializedViewDefinition(
public Optional<BaseTable> getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName)
{
String storageMetadataLocation;
MaterializedViewData materializedViewData = materializedViewCache.get(viewName);
MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName);
if (materializedViewData == null) {
Optional<com.amazonaws.services.glue.model.Table> maybeTable = getTableAndCacheMetadata(session, viewName);
if (maybeTable.isEmpty()) {
Expand Down Expand Up @@ -1423,7 +1445,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session,
{
requireNonNull(storageTableName, "storageTableName is null");
requireNonNull(storageMetadataLocation, "storageMetadataLocation is null");
return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> {
return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation);
});
Expand All @@ -1436,7 +1458,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
try {
com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
materializedViewCache.remove(source);
materializedViewCache.invalidate(source);
Map<String, String> tableParameters = getTableParameters(glueTable);
if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source);
Expand Down Expand Up @@ -1493,7 +1515,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
tableMetadataCache.invalidate(schemaTableName);
}

com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/
package io.trino.plugin.iceberg.catalog.hms;

import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.log.Logger;
import io.trino.cache.EvictableCacheBuilder;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
Expand Down Expand Up @@ -69,14 +72,15 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
Expand Down Expand Up @@ -124,6 +128,7 @@ public class TrinoHiveCatalog
extends AbstractTrinoCatalog
{
private static final Logger log = Logger.get(TrinoHiveCatalog.class);
private static final int PER_QUERY_CACHE_SIZE = 1000;
public static final String DEPENDS_ON_TABLES = "dependsOnTables";
// Value should be ISO-8601 formatted time instant
public static final String TRINO_QUERY_START_TIME = "trino-query-start-time";
Expand All @@ -135,7 +140,9 @@ public class TrinoHiveCatalog
private final boolean deleteSchemaLocationsFallback;
private final boolean hideMaterializedViewStorageTable;

private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final Cache<SchemaTableName, TableMetadata> tableMetadataCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();

public TrinoHiveCatalog(
CatalogName catalogName,
Expand Down Expand Up @@ -458,9 +465,17 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
@Override
public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
{
TableMetadata metadata = tableMetadataCache.computeIfAbsent(
schemaTableName,
ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current());
TableMetadata metadata;
try {
metadata = uncheckedCacheGet(
tableMetadataCache,
schemaTableName,
() -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current());
}
catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
throw e;
}

return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata);
}
Expand Down Expand Up @@ -838,7 +853,7 @@ public Optional<BaseTable> getMaterializedViewStorageTable(ConnectorSession sess

private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, io.trino.plugin.hive.metastore.Table materializedView)
{
return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> {
return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> {
String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP);
checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getTableName());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down Expand Up @@ -895,6 +910,6 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.remove(schemaTableName);
tableMetadataCache.invalidate(schemaTableName);
}
}
Loading

0 comments on commit 8d4f26b

Please sign in to comment.