Skip to content

Commit

Permalink
Support additional configurations for Nessie catalog in Iceberg conne…
Browse files Browse the repository at this point in the history
…ctor

To reduce the review effort, only the basic Nessie configurations were supported in trinodb#11701.
Nessie server can be deployed with Auth mode like keycloak. So, need to expose the Nessie client configurations to handle the Auth.
Along with that, some common Nessie server configurations like read-timeout-ms, connect-timeout-ms and compression-enabled properties
are exposed to have finer control over the Nessie commits.
  • Loading branch information
ajantha-bhat committed Jun 5, 2023
1 parent 26ef0fd commit 0f9f61e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 11 deletions.
18 changes: 18 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,31 @@ properties:
* - ``iceberg.nessie-catalog.default-warehouse-dir``
- Default warehouse directory for schemas created without an explicit
``location`` property. Example: ``/tmp``
* - ``iceberg.nessie-catalog.authentication.type``
- The authentication type to use.
Available values are ``NONE`` or ``BEARER``.
Example: ``BEARER``
* - ``iceberg.nessie-catalog.authentication.token``
- The token to use with ``BEARER`` authentication.
Example: ``SXVLUXUhIExFQ0tFUiEK``
* - ``iceberg.nessie-catalog.read-timeout``
- The read timeout duration for requests to the Nessie server.
Example: ``5s``
* - ``iceberg.nessie-catalog.connection-timeout``
- The connection timeout duration for connection requests to the Nessie server.
Example: ``1m``
* - ``iceberg.nessie-catalog.compression-enabled``
- Configure whether compression should be enabled or not for
requests to the Nessie server, defaults to ``true``.

.. code-block:: text
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie-catalog.uri=https://localhost:19120/api/v1
iceberg.nessie-catalog.default-warehouse-dir=/tmp
iceberg.nessie-catalog.authentication.type=BEARER
iceberg.nessie-catalog.authentication.token=SXVLUXUhIExFQ0tFUiEK
.. _iceberg-jdbc-catalog:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,34 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import org.projectnessie.client.NessieConfigConstants;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public class IcebergNessieCatalogConfig
{
public enum Security
{
NONE,
BEARER
}

private String defaultReferenceName = "main";
private String defaultWarehouseDir;
private URI serverUri;
private Security security = Security.NONE;
private Optional<String> bearerToken = Optional.empty();
private Duration readTimeout = new Duration(NessieConfigConstants.DEFAULT_READ_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
private Duration connectionTimeout = new Duration(NessieConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
private boolean compressionEnabled = true;

@NotNull
public String getDefaultReferenceName()
Expand Down Expand Up @@ -68,4 +85,72 @@ public IcebergNessieCatalogConfig setDefaultWarehouseDir(String defaultWarehouse
this.defaultWarehouseDir = defaultWarehouseDir;
return this;
}

public Security getSecurity()
{
return security;
}

@Config("iceberg.nessie-catalog.authentication.type")
@ConfigDescription("The authentication type to use")
public IcebergNessieCatalogConfig setSecurity(Security security)
{
this.security = security;
return this;
}

public Optional<String> getBearerToken()
{
return bearerToken;
}

@Config("iceberg.nessie-catalog.authentication.token")
@ConfigDescription("The token to use with BEARER authentication")
@ConfigSecuritySensitive
public IcebergNessieCatalogConfig setBearerToken(String token)
{
this.bearerToken = Optional.ofNullable(token);
return this;
}

@MinDuration("1ms")
public Duration getReadTimeout()
{
return readTimeout;
}

@Config("iceberg.nessie-catalog.read-timeout")
@ConfigDescription("The read timeout for the client.")
public IcebergNessieCatalogConfig setReadTimeout(Duration readTimeout)
{
this.readTimeout = readTimeout;
return this;
}

@MinDuration("1ms")
public Duration getConnectionTimeout()
{
return connectionTimeout;
}

@Config("iceberg.nessie-catalog.connection-timeout")
@ConfigDescription("The connection timeout for the client.")
public IcebergNessieCatalogConfig setConnectionTimeout(Duration connectionTimeout)
{
this.connectionTimeout = connectionTimeout;
return this;
}

public boolean isCompressionEnabled()
{
return compressionEnabled;
}

@Config("iceberg.nessie-catalog.compression-enabled")
@ConfigDescription("Configure whether compression should be enabled or not.")
public IcebergNessieCatalogConfig setCompressionEnabled(boolean compressionEnabled)
{
this.compressionEnabled = compressionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.auth.BearerAuthenticationProvider;
import org.projectnessie.client.http.HttpClientBuilder;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogConfig.Security.BEARER;
import static java.lang.Math.toIntExact;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergNessieCatalogModule
Expand All @@ -43,15 +46,18 @@ protected void setup(Binder binder)

@Provides
@Singleton
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig config)
{
return new NessieIcebergClient(
HttpClientBuilder.builder()
.withUri(icebergNessieCatalogConfig.getServerUri())
.withEnableApiCompatibilityCheck(false)
.build(NessieApiV1.class),
icebergNessieCatalogConfig.getDefaultReferenceName(),
null,
ImmutableMap.of());
HttpClientBuilder builder = HttpClientBuilder.builder()
.withUri(config.getServerUri())
.withDisableCompression(!config.isCompressionEnabled())
.withEnableApiCompatibilityCheck(false);

builder.withReadTimeout(toIntExact(config.getReadTimeout().toMillis()));
builder.withConnectionTimeout(toIntExact(config.getConnectionTimeout().toMillis()));
if (config.getSecurity().equals(BEARER)) {
config.getBearerToken().ifPresent(token -> builder.withAuthentication(BearerAuthenticationProvider.create(token)));
}
return new NessieIcebergClient(builder.build(NessieApiV1.class), config.getDefaultReferenceName(), null, ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package io.trino.plugin.iceberg.catalog.nessie;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import org.projectnessie.client.NessieConfigConstants;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
Expand All @@ -31,7 +34,12 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(IcebergNessieCatalogConfig.class)
.setDefaultWarehouseDir(null)
.setServerUri(null)
.setDefaultReferenceName("main"));
.setSecurity(IcebergNessieCatalogConfig.Security.NONE)
.setBearerToken(null)
.setCompressionEnabled(true)
.setDefaultReferenceName("main")
.setConnectionTimeout(new Duration(NessieConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
.setReadTimeout(new Duration(NessieConfigConstants.DEFAULT_READ_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)));
}

@Test
Expand All @@ -41,12 +49,22 @@ public void testExplicitPropertyMapping()
.put("iceberg.nessie-catalog.default-warehouse-dir", "/tmp")
.put("iceberg.nessie-catalog.uri", "http://localhost:xxx/api/v1")
.put("iceberg.nessie-catalog.ref", "someRef")
.put("iceberg.nessie-catalog.authentication.type", "BEARER")
.put("iceberg.nessie-catalog.authentication.token", "bearerToken")
.put("iceberg.nessie-catalog.compression-enabled", "false")
.put("iceberg.nessie-catalog.connection-timeout", "2s")
.put("iceberg.nessie-catalog.read-timeout", "5m")
.buildOrThrow();

IcebergNessieCatalogConfig expected = new IcebergNessieCatalogConfig()
.setDefaultWarehouseDir("/tmp")
.setServerUri(URI.create("http://localhost:xxx/api/v1"))
.setDefaultReferenceName("someRef");
.setDefaultReferenceName("someRef")
.setSecurity(IcebergNessieCatalogConfig.Security.BEARER)
.setBearerToken("bearerToken")
.setCompressionEnabled(false)
.setConnectionTimeout(new Duration(2, TimeUnit.SECONDS))
.setReadTimeout(new Duration(5, TimeUnit.MINUTES));

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit 0f9f61e

Please sign in to comment.