From 66fed2ef6bfc7af2c0a7bb3bb0bbff41c1a62019 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 24 Mar 2022 10:22:20 +0100 Subject: [PATCH] Support Nessie Catalog in Iceberg connector This PR integrates the (Nessie catalog functionality)[https://github.com/apache/iceberg/tree/master/nessie/src/main/java/org/apache/iceberg/nessie] to the Iceberg connector. It adds the following new things: * a new `CatalogType` called `NESSIE` * an `IcebergNessieCatalogModule` that sets up all necessary dependencies (including a client to connect to the Nessie server) * a `NessieConfig` that includes configuration settings required for Nessie * `TrinoNessieCatalog` + `NessieIcebergTableOperations` that implement the main behavior of the catalog * some unit and integration tests that verify that the Iceberg connector works with Nessie (note that the integration test requires a Nessie server to be started, which is being done via the `nessie-apprunner-maven-plugin`prior to the integration-test phase) This PR does not yet include documentation updates, as I'd like to get some feedback first before addressing that. --- plugin/trino-iceberg/pom.xml | 89 ++++ .../io/trino/plugin/iceberg/CatalogType.java | 1 + .../iceberg/catalog/IcebergCatalogModule.java | 3 + .../nessie/IcebergNessieCatalogModule.java | 64 +++ .../iceberg/catalog/nessie/NessieConfig.java | 66 +++ .../nessie/NessieIcebergTableOperations.java | 134 +++++ .../NessieIcebergTableOperationsProvider.java | 64 +++ .../catalog/nessie/NessieIcebergUtil.java | 143 ++++++ .../catalog/nessie/TrinoNessieCatalog.java | 469 ++++++++++++++++++ .../nessie/TrinoNessieCatalogFactory.java | 58 +++ .../catalog/nessie/UpdateableReference.java | 108 ++++ .../plugin/iceberg/TestIcebergPlugin.java | 33 ++ .../plugin/iceberg/TestNessieConfig.java | 56 +++ .../TestTrinoNessieCatalogIntegration.java | 124 +++++ 14 files changed, 1412 insertions(+) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieConfig.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperations.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperationsProvider.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergUtil.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/UpdateableReference.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestNessieConfig.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoNessieCatalogIntegration.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 76da28909f1c..2dc30f829871 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -16,6 +16,8 @@ ${project.parent.basedir} 0.13.1 + 0.23.1 + 0.21.4 @@ -187,6 +189,18 @@ + + org.projectnessie + nessie-client + ${dep.nessie.version} + + + + org.projectnessie + nessie-model + ${dep.nessie.version} + + org.weakref jmxutils @@ -353,6 +367,64 @@ + + org.projectnessie + nessie-apprunner-maven-plugin + ${dep.nessie-apprunner.version} + + ${skipITs} + org.projectnessie:nessie-quarkus:jar:runner:${dep.nessie.version} + + + false + + INMEMORY + + + + + org.projectnessie + nessie-quarkus + ${dep.nessie.version} + runner + + + + + start + pre-integration-test + start + + + stop + post-integration-test + stop + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.0.0-M5 + + + + ${quarkus.http.test-port} + + false + + **/TestTrinoNessieCatalogIntegration.java + + + + + + integration-test + verify + + + + @@ -374,6 +446,7 @@ **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java + **/TestTrinoNessieCatalogIntegration.java @@ -419,5 +492,21 @@ + + nessie + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/Test*Nessie*.java + + + + + + diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java index 35f46922a785..294719b1b521 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -18,5 +18,6 @@ public enum CatalogType TESTING_FILE_METASTORE, HIVE_METASTORE, GLUE, + NESSIE, /**/; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 9a9dbf795bd2..ee5f1ecb4fb1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -30,12 +30,14 @@ import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; +import io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogModule; import java.util.Optional; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.CatalogType.NESSIE; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static java.util.Objects.requireNonNull; @@ -63,6 +65,7 @@ protected void setup(Binder binder) bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); + bindCatalogModule(NESSIE, new IcebergNessieCatalogModule()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java new file mode 100644 index 000000000000..21c9555a3b97 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.http.HttpClientBuilder; + +import javax.inject.Provider; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class IcebergNessieCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(NessieConfig.class); + binder.bind(UpdateableReference.class); + binder.bind(NessieApiV1.class).toProvider(NessieApiProvider.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableOperationsProvider.class).to(NessieIcebergTableOperationsProvider.class).in(Scopes.SINGLETON); + newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName(); + binder.bind(TrinoCatalogFactory.class).to(TrinoNessieCatalogFactory.class).in(Scopes.SINGLETON); + newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName(); + } + + public static class NessieApiProvider + implements Provider + { + private final NessieConfig nessieConfig; + + @Inject + public NessieApiProvider(NessieConfig nessieConfig) + { + this.nessieConfig = nessieConfig; + } + + @Override + public NessieApiV1 get() + { + return HttpClientBuilder.builder() + .withUri(nessieConfig.getServerUri()) + .build(NessieApiV1.class); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieConfig.java new file mode 100644 index 000000000000..ea22e65ff9e5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.NotEmpty; + +public class NessieConfig +{ + private String defaultReferenceName = "main"; + private String serverUri = "http://localhost:19120/api/v1"; + private String warehouseDir; + + public String getDefaultReferenceName() + { + return defaultReferenceName; + } + + @Config("iceberg.nessie.ref") + @ConfigDescription("The default Nessie reference to work on") + public NessieConfig setDefaultReferenceName(String defaultReferenceName) + { + this.defaultReferenceName = defaultReferenceName; + return this; + } + + public String getServerUri() + { + return serverUri; + } + + @Config("iceberg.nessie.uri") + @ConfigDescription("The URI to connect to the Nessie server") + public NessieConfig setServerUri(String serverUri) + { + this.serverUri = serverUri; + return this; + } + + @NotEmpty + public String getWarehouseDir() + { + return warehouseDir; + } + + @Config("iceberg.nessie.warehouse") + @ConfigDescription("The default warehouse to use for Nessie") + public NessieConfig setWarehouseDir(String warehouseDir) + { + this.warehouseDir = warehouseDir; + return this; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperations.java new file mode 100644 index 000000000000..181d8906db66 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperations.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.ImmutableIcebergTable; +import org.projectnessie.model.Operation; + +import java.util.Optional; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.catalog.nessie.NessieIcebergUtil.buildCommitMeta; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class NessieIcebergTableOperations + extends AbstractIcebergTableOperations +{ + private final NessieApiV1 nessieApi; + private final UpdateableReference reference; + + protected NessieIcebergTableOperations( + NessieApiV1 nessieApi, + UpdateableReference reference, + FileIO fileIo, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.nessieApi = requireNonNull(nessieApi, "nessieApi is null"); + this.reference = reference; + } + + @Override + public TableMetadata refresh() + { + NessieIcebergUtil.refreshReference(nessieApi, reference); + return super.refresh(); + } + + @Override + protected String getRefreshedLocation() + { + IcebergTable table = NessieIcebergUtil.loadTable(nessieApi, reference, + new SchemaTableName(database, tableName)); + + if (null == table) { + throw new TableNotFoundException(getSchemaTableName()); + } + + return table.getMetadataLocation(); + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + reference.checkMutable(); + verify(version == -1, "commitNewTable called on a table which already exists"); + doCommit(metadata, writeNewMetadata(metadata, 0)); + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + reference.checkMutable(); + verify(version >= 0, "commitToExistingTable called on a new table"); + doCommit(metadata, writeNewMetadata(metadata, version + 1)); + shouldRefresh = true; + } + + private void doCommit(TableMetadata metadata, String metadataLocation) + { + SchemaTableName tableName = new SchemaTableName(database, this.tableName); + try { + ImmutableIcebergTable.Builder newTableBuilder = ImmutableIcebergTable.builder(); + Snapshot snapshot = metadata.currentSnapshot(); + long snapshotId = snapshot != null ? snapshot.snapshotId() : -1L; + IcebergTable newTable = newTableBuilder + .snapshotId(snapshotId) + .schemaId(metadata.currentSchemaId()) + .specId(metadata.defaultSpecId()) + .sortOrderId(metadata.defaultSortOrderId()) + .metadataLocation(metadataLocation) + .build(); + + Branch branch = nessieApi.commitMultipleOperations() + .operation(Operation.Put.of(NessieIcebergUtil.toKey(tableName), newTable)) + .commitMeta( + buildCommitMeta(session, format("Trino Iceberg add table %s", + tableName))) + .branch(reference.getAsBranch()) + .commit(); + reference.updateReference(branch); + } + catch (NessieNotFoundException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, + format("Cannot commit: Reference %s no longer exists", + reference.getName()), e); + } + catch (NessieConflictException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, + format("Cannot commit: Reference hash is out of date. " + + "Update the reference %s and try again", reference.getName()), e); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperationsProvider.java new file mode 100644 index 000000000000..3d3c9c6a9ef6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergTableOperationsProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorSession; +import org.projectnessie.client.api.NessieApiV1; + +import javax.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class NessieIcebergTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final FileIoProvider fileIoProvider; + private final NessieApiV1 nessieApi; + private final UpdateableReference reference; + + @Inject + public NessieIcebergTableOperationsProvider(FileIoProvider fileIoProvider, NessieApiV1 nessieApi, UpdateableReference reference) + { + this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + this.nessieApi = requireNonNull(nessieApi, "nessieApi is null"); + this.reference = reference; + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new NessieIcebergTableOperations( + nessieApi, + reference, + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergUtil.java new file mode 100644 index 000000000000..7a14d3427eb8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/NessieIcebergUtil.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.Content; +import org.projectnessie.model.Content.Type; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.EntriesResponse; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.ImmutableCommitMeta; +import org.projectnessie.model.Reference; +import org.projectnessie.model.Tag; + +import javax.annotation.Nullable; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static java.lang.String.format; + +final class NessieIcebergUtil +{ + private NessieIcebergUtil() + { + } + + static ContentKey toKey(SchemaTableName tableName) + { + return ContentKey.of(org.projectnessie.model.Namespace.parse(tableName.getSchemaName()), + tableName.getTableName()); + } + + @Nullable + static IcebergTable loadTable(NessieApiV1 nessieApi, UpdateableReference reference, + SchemaTableName schemaTableName) + { + try { + ContentKey key = NessieIcebergUtil.toKey(schemaTableName); + Content table = nessieApi.getContent().key(key).reference(reference.getReference()) + .get().get(key); + return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null; + } + catch (NessieNotFoundException e) { + return null; + } + } + + static Stream tableStream(NessieApiV1 nessieApi, UpdateableReference reference, + Optional namespace) + { + try { + return nessieApi.getEntries() + .reference(reference.getReference()) + .get() + .getEntries() + .stream() + .filter(NessieIcebergUtil.namespacePredicate(namespace)) + .filter(e -> Type.ICEBERG_TABLE == e.getType()) + .map(e -> new SchemaTableName(e.getName().getNamespace().name(), + e.getName().getName())); + } + catch (NessieNotFoundException ex) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, ex); + } + } + + static ImmutableCommitMeta buildCommitMeta(ConnectorSession session, String commitMsg) + { + return CommitMeta.builder() + .message(commitMsg) + .author(session.getUser()) + .build(); + } + + static Predicate namespacePredicate(Optional namespace) + { + return namespace.>map( + ns -> entry -> org.projectnessie.model.Namespace.parse(ns) + .equals(entry.getName().getNamespace())).orElseGet(() -> e -> true); + } + + static UpdateableReference loadReference(NessieApiV1 nessieApi, String requestedRef, + String hash) + { + try { + Reference ref = requestedRef == null ? nessieApi.getDefaultBranch() + : nessieApi.getReference().refName(requestedRef).get(); + if (hash != null) { + if (ref instanceof Branch) { + ref = Branch.of(ref.getName(), hash); + } + else { + ref = Tag.of(ref.getName(), hash); + } + } + return new UpdateableReference(ref, hash != null); + } + catch (NessieNotFoundException ex) { + if (requestedRef != null) { + throw new IllegalArgumentException(format( + "Nessie ref '%s' does not exist. This ref must exist before creating a NessieCatalog.", + requestedRef), ex); + } + + throw new IllegalArgumentException( + "Nessie does not have an existing default branch." + + "Either configure an alternative ref via 'iceberg.nessie.ref' or create the default branch on the server.", + ex); + } + } + + static void refreshReference(NessieApiV1 nessieApi, UpdateableReference reference) + { + try { + reference.refresh(nessieApi); + } + catch (NessieNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + format("Failed to refresh as Reference '%s' is no longer valid.", + reference.getName()), e); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java new file mode 100644 index 000000000000..de5e9db80404 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -0,0 +1,469 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.iceberg.IcebergSchemaProperties; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.util.Tasks; +import org.projectnessie.client.api.CommitMultipleOperationsBuilder; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.BaseNessieClientServerException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNamespaceAlreadyExistsException; +import org.projectnessie.error.NessieNamespaceNotEmptyException; +import org.projectnessie.error.NessieNamespaceNotFoundException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.error.NessieReferenceNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Namespace; +import org.projectnessie.model.Operation; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.catalog.nessie.NessieIcebergUtil.buildCommitMeta; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class TrinoNessieCatalog + extends AbstractTrinoCatalog +{ + private final String warehouseLocation; + private final NessieApiV1 nessieApi; + private final NessieConfig nessieConfig; + private final UpdateableReference reference; + // TODO: this is just a workaround for now until Namespaces actually support properties + private final Map> propertiesByNamespace = new ConcurrentHashMap<>(); + + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoNessieCatalog( + IcebergTableOperationsProvider tableOperationsProvider, + NessieApiV1 nessieApi, + String warehouseLocation, + NessieConfig nessieConfig, + boolean useUniqueTableLocation) + { + super(tableOperationsProvider, useUniqueTableLocation); + this.nessieApi = requireNonNull(nessieApi, "nessieApi is null"); + this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null"); + this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null"); + this.reference = NessieIcebergUtil.loadReference(nessieApi, + this.nessieConfig.getDefaultReferenceName(), null); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + try { + return nessieApi.getMultipleNamespaces() + .refName(reference.getName()) + .get().getNamespaces().stream() + .map(Namespace::name) + .collect(Collectors.toList()); + } + catch (NessieReferenceNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + "Cannot list Namespaces: ref is no longer valid.", e); + } + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + try { + reference.checkMutable(); + nessieApi.deleteNamespace() + .refName(reference.getName()) + .namespace(namespace) + .delete(); + propertiesByNamespace.remove(namespace); + NessieIcebergUtil.refreshReference(nessieApi, reference); + } + catch (NessieNamespaceNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (NessieReferenceNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + format("Cannot drop Namespace '%s': ref is no longer valid.", namespace), + e); + } + catch (NessieNamespaceNotEmptyException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, format( + "Namespace '%s' is not empty. One or more tables exist.", namespace), e); + } + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + nessieApi.getNamespace() + .refName(reference.getName()) + .namespace(namespace) + .get(); + } + catch (NessieNamespaceNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (NessieReferenceNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + format("Cannot load Namespace '%s': ref is no longer valid.", namespace), + e); + } + return propertiesByNamespace.get(namespace); + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, + String namespace) + { + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, + Map properties, TrinoPrincipal owner) + { + try { + reference.checkMutable(); + nessieApi.createNamespace() + .refName(reference.getName()) + .namespace(namespace) + .create(); + propertiesByNamespace.put(namespace, properties); + NessieIcebergUtil.refreshReference(nessieApi, reference); + } + catch (NessieNamespaceAlreadyExistsException e) { + throw new SchemaAlreadyExistsException(namespace); + } + catch (NessieReferenceNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + format("Cannot create Namespace '%s': ref is no longer valid.", + namespace), e); + } + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, + TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, + "setNamespacePrincipal is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, + "renameNamespace is not supported for Iceberg Nessie catalogs"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + NessieIcebergUtil.refreshReference(nessieApi, reference); + return NessieIcebergUtil.tableStream(nessieApi, reference, namespace) + .collect(Collectors.toList()); + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName table) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + table, + ignore -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + + return getIcebergTableWithMetadata( + this, + tableOperationsProvider, + session, + table, + metadata); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + reference.checkMutable(); + BaseTable table = (BaseTable) loadTable(session, schemaTableName); + validateTableCanBeDropped(table); + + IcebergTable existingTable = NessieIcebergUtil.loadTable(nessieApi, reference, + schemaTableName); + if (existingTable == null) { + return; + } + + CommitMultipleOperationsBuilder commitBuilderBase = nessieApi.commitMultipleOperations() + .commitMeta(buildCommitMeta(session, format("Trino Iceberg delete table %s", + schemaTableName))) + .operation(Operation.Delete.of(NessieIcebergUtil.toKey(schemaTableName))); + + // We try to drop the table. Simple retry after ref update. + try { + Tasks.foreach(commitBuilderBase) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .onFailure((o, exception) -> NessieIcebergUtil.refreshReference(nessieApi, + reference)) + .run(commitBuilder -> { + Branch branch = commitBuilder + .branch(reference.getAsBranch()) + .commit(); + reference.updateReference(branch); + }, BaseNessieClientServerException.class); + } + catch (NessieConflictException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + "Cannot drop table: failed after retry (update ref and retry)", e); + } + catch (NessieNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + "Cannot drop table: ref is no longer valid.", e); + } + catch (BaseNessieClientServerException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, "Cannot drop table: unknown error", e); + } + } + + @Override + public Transaction newCreateTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + String location, + Map properties) + { + return newCreateTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + reference.checkMutable(); + + IcebergTable existingFromTable = NessieIcebergUtil.loadTable(nessieApi, reference, from); + if (existingFromTable == null) { + throw new SchemaNotFoundException("table %s doesn't exists", from.getTableName()); + } + IcebergTable existingToTable = NessieIcebergUtil.loadTable(nessieApi, reference, to); + if (existingToTable != null) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + format("table %s already exists", to.getTableName())); + } + + CommitMultipleOperationsBuilder operations = nessieApi.commitMultipleOperations() + .commitMeta(NessieIcebergUtil.buildCommitMeta(session, + format("Trino Iceberg rename table from '%s' to '%s'", + from, to))) + .operation(Operation.Put.of(NessieIcebergUtil.toKey(to), existingFromTable, + existingFromTable)) + .operation(Operation.Delete.of(NessieIcebergUtil.toKey(from))); + + try { + Tasks.foreach(operations) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .onFailure((o, exception) -> NessieIcebergUtil.refreshReference(nessieApi, + reference)) + .run(ops -> { + Branch branch = ops + .branch(reference.getAsBranch()) + .commit(); + reference.updateReference(branch); + }, BaseNessieClientServerException.class); + } + catch (NessieNotFoundException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + "Failed to drop table as ref is no longer valid.", e); + } + catch (BaseNessieClientServerException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, + "Failed to rename table: the current reference is not up to date.", e); + } + catch (HttpClientException ex) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, ex); + } + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + String tableName = createNewTableName(schemaTableName.getTableName()); + + String databaseLocation = null; + + Map properties = propertiesByNamespace.get(schemaTableName.getSchemaName()); + if (null != properties) { + databaseLocation = (String) properties.get(IcebergSchemaProperties.LOCATION_PROPERTY); + } + + Path location; + if (databaseLocation == null) { + String schemaDirectoryName = schemaTableName.getSchemaName() + ".db"; + location = new Path(new Path(warehouseLocation, schemaDirectoryName), tableName); + } + else { + location = new Path(databaseLocation, tableName); + } + + return location.toString(); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, + TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, + "setTablePrincipal is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, + ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, + "createView is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, + SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, + "renameView is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, + TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, + "setViewPrincipal is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, + "dropView is not supported for Iceberg Nessie catalogs"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, + Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, + SchemaTableName viewIdentifier) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, + Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, + ConnectorMaterializedViewDefinition definition, boolean replace, + boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, + "createMaterializedView is not supported for Iceberg Nessie catalogs"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, + "dropMaterializedView is not supported for Iceberg Nessie catalogs"); + } + + @Override + public Optional getMaterializedView( + ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, + SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, + "renameMaterializedView is not supported for Iceberg Nessie catalogs"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java new file mode 100644 index 000000000000..c121d42c3694 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.nessie; + +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.security.ConnectorIdentity; +import org.projectnessie.client.api.NessieApiV1; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class TrinoNessieCatalogFactory + implements TrinoCatalogFactory +{ + private final IcebergTableOperationsProvider tableOperationsProvider; + private final String warehouseLocation; + private final NessieApiV1 nessieApi; + private final NessieConfig nessieConfig; + private final boolean isUniqueTableLocation; + + @Inject + public TrinoNessieCatalogFactory( + IcebergTableOperationsProvider tableOperationsProvider, + NessieApiV1 nessieApi, + NessieConfig nessieConfig, + IcebergConfig icebergConfig) + { + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, + "tableOperationsProvider is null"); + this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null"); + this.warehouseLocation = nessieConfig.getWarehouseDir(); + this.nessieApi = requireNonNull(nessieApi, "nessieApi is null"); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + @Override + public TrinoCatalog create(ConnectorIdentity identity) + { + return new TrinoNessieCatalog(tableOperationsProvider, nessieApi, + warehouseLocation, nessieConfig, isUniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/UpdateableReference.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/UpdateableReference.java new file mode 100644 index 000000000000..0c227a3602af --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/UpdateableReference.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.iceberg.catalog.nessie; + +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.Reference; + +import java.util.StringJoiner; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class UpdateableReference +{ + private Reference reference; + private final boolean mutable; + + // required for dependency injection + public UpdateableReference() + { + this(Branch.of("main", null), false); + } + + /** + * Construct a new {@link UpdateableReference} using a Nessie reference object and a flag + * whether an explicit hash was used to create the reference object. + */ + public UpdateableReference(Reference reference, boolean hashReference) + { + this.reference = reference; + this.mutable = reference instanceof Branch && !hashReference; + } + + public boolean refresh(NessieApiV1 api) + throws NessieNotFoundException + { + if (!mutable) { + return false; + } + Reference oldReference = reference; + reference = api.getReference().refName(reference.getName()).get(); + return !oldReference.equals(reference); + } + + public void updateReference(Reference ref) + { + checkState(mutable, "Hash references cannot be updated."); + this.reference = requireNonNull(ref); + } + + public boolean isBranch() + { + return reference instanceof Branch; + } + + public String getHash() + { + return reference.getHash(); + } + + public Branch getAsBranch() + { + if (!isBranch()) { + throw new IllegalArgumentException("Reference is not a branch"); + } + return (Branch) reference; + } + + public Reference getReference() + { + return reference; + } + + public void checkMutable() + { + checkArgument(mutable, + "You can only mutate tables when using a branch without a hash or timestamp."); + } + + public String getName() + { + return reference.getName(); + } + + @Override + public String toString() + { + return new StringJoiner(", ", UpdateableReference.class.getSimpleName() + "[", "]") + .add("reference=" + reference) + .add("mutable=" + mutable) + .toString(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 14f5142992ef..3bb96189eea1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -120,6 +120,16 @@ public void testRecordingMetastore() "hive.metastore-recording-path", "/tmp"), new TestingConnectorContext())) .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used"); + + // recording with nessie + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "nessie", + "hive.metastore.nessie.region", "us-east-2", + "hive.metastore-recording-path", "/tmp"), + new TestingConnectorContext())) + .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used"); } @Test @@ -196,4 +206,27 @@ private static ConnectorFactory getConnectorFactory() { return getOnlyElement(new IcebergPlugin().getConnectorFactories()); } + + @Test + public void testNessieMetastore() + { + ConnectorFactory factory = getConnectorFactory(); + + factory.create( + "test", + Map.of( + "iceberg.catalog.type", "nessie", + "iceberg.nessie.warehouse", "/tmp"), + new TestingConnectorContext()) + .shutdown(); + + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "nessie", + "iceberg.nessie.catalogid", "123", + "iceberg.nessie.warehouse", "/tmp"), + new TestingConnectorContext())) + .hasMessageContaining("'iceberg.nessie.catalogid' was not used"); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestNessieConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestNessieConfig.java new file mode 100644 index 000000000000..a0958fcb86a7 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestNessieConfig.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.iceberg.catalog.nessie.NessieConfig; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestNessieConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(NessieConfig.class) + .setWarehouseDir(null) + .setServerUri("http://localhost:19120/api/v1") + .setDefaultReferenceName("main")); + } + + @Test + public void testExplicitPropertyMapping() + { + String warehouseDir = "/tmp"; + String serverUri = "http://localhost:xxx/api/v1"; + String ref = "someRef"; + Map properties = ImmutableMap.builder() + .put("iceberg.nessie.warehouse", warehouseDir) + .put("iceberg.nessie.uri", serverUri) + .put("iceberg.nessie.ref", ref) + .buildOrThrow(); + + NessieConfig expected = new NessieConfig() + .setWarehouseDir(warehouseDir) + .setServerUri(serverUri) + .setDefaultReferenceName(ref); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoNessieCatalogIntegration.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoNessieCatalogIntegration.java new file mode 100644 index 000000000000..95bdae9eef9a --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoNessieCatalogIntegration.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.nessie.NessieConfig; +import io.trino.plugin.iceberg.catalog.nessie.NessieIcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.nessie.TrinoNessieCatalog; +import io.trino.plugin.iceberg.catalog.nessie.UpdateableReference; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.PrincipalType; +import io.trino.spi.security.TrinoPrincipal; +import org.assertj.core.api.Assertions; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.http.HttpClientBuilder; +import org.projectnessie.model.Branch; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestTrinoNessieCatalogIntegration + extends BaseTrinoCatalogTest +{ + private static final Integer HTTP_TEST_PORT = Integer.getInteger("nessie.http.test-port", 19121); + + @Override + protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) + { + Path tmpDirectory = null; + try { + tmpDirectory = Files.createTempDirectory("test_nessie_catalog_warehouse_dir_"); + } + catch (IOException e) { + Assertions.fail(e.getMessage()); + } + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(new HiveHdfsConfiguration( + new HdfsConfigurationInitializer( + new HdfsConfig(), + ImmutableSet.of()), + ImmutableSet.of()), + new HdfsConfig(), + new NoHdfsAuthentication()); + NessieConfig nessieConfig = new NessieConfig(); + NessieApiV1 nessieApi = HttpClientBuilder.builder().withUri("http://127.0.0.1:" + HTTP_TEST_PORT + "/api/v1") + .build(NessieApiV1.class); + return new TrinoNessieCatalog( + new NessieIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), + nessieApi, new UpdateableReference( + Branch.of("main", null), false)), + nessieApi, + tmpDirectory.toAbsolutePath().toString(), + nessieConfig, + useUniqueTableLocations); + } + + @Test + public void testDefaultLocation() + throws IOException + { + Path tmpDirectory = Files.createTempDirectory("test_nessie_catalog_default_location_"); + tmpDirectory.toFile().deleteOnExit(); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(new HiveHdfsConfiguration( + new HdfsConfigurationInitializer( + new HdfsConfig(), + ImmutableSet.of()), + ImmutableSet.of()), + new HdfsConfig(), + new NoHdfsAuthentication()); + NessieConfig nessieConfig = new NessieConfig(); + nessieConfig.setWarehouseDir(tmpDirectory.toAbsolutePath().toString()); + NessieApiV1 nessieApi = HttpClientBuilder.builder().withUri("http://127.0.0.1:" + HTTP_TEST_PORT + "/api/v1") + .build(NessieApiV1.class); + TrinoCatalog catalogWithDefaultLocation = new TrinoNessieCatalog( + new NessieIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), + nessieApi, new UpdateableReference( + Branch.of("main", null), false)), + nessieApi, + nessieConfig.getWarehouseDir(), + nessieConfig, + false); + + String namespace = "test_default_location_" + randomTableSuffix(); + String table = "tableName"; + SchemaTableName schemaTableName = new SchemaTableName(namespace, table); + catalogWithDefaultLocation.createNamespace(SESSION, namespace, ImmutableMap.of(), + new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + try { + File expectedSchemaDirectory = new File(tmpDirectory.toFile(), namespace + ".db"); + File expectedTableDirectory = new File(expectedSchemaDirectory, + schemaTableName.getTableName()); + assertThat(catalogWithDefaultLocation.defaultTableLocation(SESSION, + schemaTableName)).isEqualTo( + expectedTableDirectory.toPath().toAbsolutePath().toString()); + } + finally { + catalogWithDefaultLocation.dropNamespace(SESSION, namespace); + } + } +}