Skip to content

Commit

Permalink
Move nested namespace support in REST catalog behind a config
Browse files Browse the repository at this point in the history
  • Loading branch information
mayankvadariya authored and ebyhr committed Nov 6, 2024
1 parent b00dc00 commit 640ecf2
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 12 deletions.
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,9 @@ following properties:
* - `iceberg.rest-catalog.vended-credentials-enabled`
- Use credentials provided by the REST backend for file system access.
Defaults to `false`.
* - `iceberg.rest-catalog.nested-namespace-enabled`
- Support querying objects under nested namespace.
Defaults to `false`.
:::

The following example shows a minimal catalog configuration using an Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum SessionType
private Optional<String> prefix = Optional.empty();
private Optional<String> warehouse = Optional.empty();
private Namespace parentNamespace = Namespace.of();
private boolean nestedNamespaceEnabled;
private Security security = Security.NONE;
private SessionType sessionType = SessionType.NONE;
private boolean vendedCredentialsEnabled;
Expand Down Expand Up @@ -98,6 +99,19 @@ public IcebergRestCatalogConfig setParentNamespace(String parentNamespace)
return this;
}

public boolean isNestedNamespaceEnabled()
{
return nestedNamespaceEnabled;
}

@Config("iceberg.rest-catalog.nested-namespace-enabled")
@ConfigDescription("Support querying objects under nested namespace")
public IcebergRestCatalogConfig setNestedNamespaceEnabled(boolean nestedNamespaceEnabled)
{
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
return this;
}

@NotNull
public Security getSecurity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class TrinoIcebergRestCatalogFactory
private final Optional<String> prefix;
private final Optional<String> warehouse;
private final Namespace parentNamespace;
private final boolean nestedNamespaceEnabled;
private final SessionType sessionType;
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
Expand Down Expand Up @@ -78,6 +79,7 @@ public TrinoIcebergRestCatalogFactory(
this.prefix = restConfig.getPrefix();
this.warehouse = restConfig.getWarehouse();
this.parentNamespace = restConfig.getParentNamespace();
this.nestedNamespaceEnabled = restConfig.isNestedNamespaceEnabled();
this.sessionType = restConfig.getSessionType();
this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled();
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
Expand Down Expand Up @@ -126,6 +128,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
sessionType,
credentials,
parentNamespace,
nestedNamespaceEnabled,
trinoVersion,
typeManager,
uniqueTableLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class TrinoRestCatalog
private final SessionType sessionType;
private final Map<String, String> credentials;
private final Namespace parentNamespace;
private final boolean nestedNamespaceEnabled;
private final String trinoVersion;
private final boolean useUniqueTableLocation;

Expand All @@ -125,6 +126,7 @@ public TrinoRestCatalog(
SessionType sessionType,
Map<String, String> credentials,
Namespace parentNamespace,
boolean nestedNamespaceEnabled,
String trinoVersion,
TypeManager typeManager,
boolean useUniqueTableLocation)
Expand All @@ -134,6 +136,7 @@ public TrinoRestCatalog(
this.sessionType = requireNonNull(sessionType, "sessionType is null");
this.credentials = ImmutableMap.copyOf(requireNonNull(credentials, "credentials is null"));
this.parentNamespace = requireNonNull(parentNamespace, "parentNamespace is null");
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.useUniqueTableLocation = useUniqueTableLocation;
Expand All @@ -154,7 +157,12 @@ public boolean namespaceExists(ConnectorSession session, String namespace)
@Override
public List<String> listNamespaces(ConnectorSession session)
{
return collectNamespaces(session, parentNamespace);
if (nestedNamespaceEnabled) {
return collectNamespaces(session, parentNamespace);
}
return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream()
.map(this::toSchemaName)
.collect(toImmutableList());
}

private List<String> collectNamespaces(ConnectorSession session, Namespace parentNamespace)
Expand Down Expand Up @@ -679,6 +687,9 @@ private void invalidateTableCache(SchemaTableName schemaTableName)

private Namespace toNamespace(String schemaName)
{
if (!nestedNamespaceEnabled && schemaName.contains(NAMESPACE_SEPARATOR)) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
if (!parentNamespace.isEmpty()) {
schemaName = parentNamespace + NAMESPACE_SEPARATOR + schemaName;
}
Expand All @@ -688,8 +699,14 @@ private Namespace toNamespace(String schemaName)
private String toSchemaName(Namespace namespace)
{
if (this.parentNamespace.isEmpty()) {
if (!nestedNamespaceEnabled && namespace.length() != 1) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
return namespace.toString();
}
if (!nestedNamespaceEnabled && ((namespace.length() - parentNamespace.length()) > 1)) {
throw new TrinoException(NOT_SUPPORTED, "Nested namespace is not enabled for this catalog");
}
return Arrays.stream(namespace.levels(), this.parentNamespace.length(), namespace.length())
.collect(joining(NAMESPACE_SEPARATOR));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void testDefaults()
.setPrefix(null)
.setWarehouse(null)
.setParentNamespace(null)
.setNestedNamespaceEnabled(false)
.setSessionType(IcebergRestCatalogConfig.SessionType.NONE)
.setSecurity(IcebergRestCatalogConfig.Security.NONE)
.setVendedCredentialsEnabled(false));
Expand All @@ -45,6 +46,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest-catalog.prefix", "dev")
.put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier")
.put("iceberg.rest-catalog.parent-namespace", "main")
.put("iceberg.rest-catalog.nested-namespace-enabled", "true")
.put("iceberg.rest-catalog.security", "OAUTH2")
.put("iceberg.rest-catalog.session", "USER")
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
Expand All @@ -55,6 +57,7 @@ public void testExplicitPropertyMappings()
.setPrefix("dev")
.setWarehouse("test_warehouse_identifier")
.setParentNamespace("main")
.setNestedNamespaceEnabled(true)
.setSessionType(IcebergRestCatalogConfig.SessionType.USER)
.setSecurity(IcebergRestCatalogConfig.Security.OAUTH2)
.setVendedCredentialsEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;
import io.airlift.http.server.testing.TestingHttpServer;
import io.trino.filesystem.Location;
import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergQueryRunner;
import io.trino.plugin.iceberg.SchemaInitializer;
import io.trino.plugin.iceberg.TestingIcebergPlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import org.apache.iceberg.BaseTable;
Expand All @@ -32,14 +35,18 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -82,18 +89,64 @@ protected QueryRunner createQueryRunner()
testServer.start();
closeAfterClass(testServer::stop);

return IcebergQueryRunner.builder("level_1.level_2")
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.addIcebergProperty("iceberg.file-format", format.name())
.addIcebergProperty("iceberg.catalog.type", "rest")
.addIcebergProperty("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.addIcebergProperty("iceberg.register-table-procedure.enabled", "true")
.addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB")
.setSchemaInitializer(SchemaInitializer.builder()
.withSchemaName("level_1.level_2")
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
String nestedSchema = "level_1.level_2";
QueryRunner queryRunner = DistributedQueryRunner.builder(testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema(nestedSchema)
.build())
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.build();

Map<String, String> nestedNamespaceDisabled = ImmutableMap.<String, String>builder()
.put("fs.hadoop.enabled", "true")
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "rest")
.put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.buildOrThrow();

Map<String, String> nestedNamespaceEnabled = ImmutableMap.<String, String>builder()
.putAll(nestedNamespaceDisabled)
.put("iceberg.rest-catalog.nested-namespace-enabled", "true")
.buildOrThrow();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data");
queryRunner.installPlugin(new TestingIcebergPlugin(dataDir));

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", nestedNamespaceEnabled);
queryRunner.createCatalog("nested_namespace_disabled", "iceberg", nestedNamespaceDisabled);

SchemaInitializer.builder()
.withSchemaName(nestedSchema)
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
.build()
.accept(queryRunner);

return queryRunner;
}
catch (Exception e) {
closeAllSuppress(e, queryRunner);
throw e;
}
}

@Test
void testNestedNamespaceDisabled()
{
// SHOW SCHEMAS returns only the top-level schema
assertThat(query("SHOW SCHEMAS FROM nested_namespace_disabled"))
.skippingTypesCheck()
.containsAll("VALUES 'level_1'");

// Other statements should fail
assertQueryFails("SHOW TABLES IN nested_namespace_disabled.\"level_1.level_2\"", "Nested namespace is not enabled for this catalog");
assertQueryFails("CREATE SCHEMA nested_namespace_disabled.\"level_1.level_2.level_3\"", "Nested namespace is not enabled for this catalog");
assertQueryFails("CREATE TABLE nested_namespace_disabled.\"level_1.level_2\".test_nested(x int)", "Nested namespace is not enabled for this catalog");
assertQueryFails("SELECT * FROM nested_namespace_disabled.\"level_1.level_2\".region", "Nested namespace is not enabled for this catalog");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLoc
NONE,
ImmutableMap.of(),
Namespace.empty(),
false,
"test",
new TestingTypeManager(),
useUniqueTableLocations);
Expand Down

0 comments on commit 640ecf2

Please sign in to comment.