Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hive.metastore.glue.skip-archive config option #23817

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ properties:
* - `hive.metastore.glue.partitions-segments`
- Number of segments for partitioned Glue tables.
- `5`
* - `hive.metastore.glue.skip-archive`
- AWS Glue has the ability to archive older table versions and a user can
roll back the table to any historical version if needed. By default, the
Hive Connector backed by Glue will not skip the archival of older table
versions.
- `false`
nineinchnick marked this conversation as resolved.
Show resolved Hide resolved
:::

(iceberg-glue-catalog)=
Expand All @@ -430,16 +436,11 @@ described with the following additional property:
* - Property name
- Description
- Default
* - `iceberg.glue.skip-archive`
- Skip archiving an old table version when creating a new version in a commit.
See [AWS Glue Skip
Archive](https://iceberg.apache.org/docs/latest/aws/#skip-archive).
- `true`
* - `iceberg.glue.cache-table-metadata`
- While updating the table in AWS Glue, store the table metadata with the
purpose of accelerating `information_schema.columns` and
`system.metadata.table_comments` queries.
- `true`
- `true`
:::

## Iceberg-specific metastores
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@
<exclude>**/TestCachedHiveGlueMetastore.java</exclude>
<exclude>**/TestGlueHiveMetastore.java</exclude>
<exclude>**/TestGlueHiveMetastoreQueries.java</exclude>
<exclude>**/TestGlueHiveMetastoreSkipArchive.java</exclude>
<exclude>**/TestHiveGlueMetadataListing.java</exclude>
<exclude>**/TestHiveGlueMetastoreAccessOperations.java</exclude>
<exclude>**/TestHiveS3AndGlueMetastoreTest.java</exclude>
Expand Down Expand Up @@ -712,6 +713,7 @@
<include>**/TestCachedHiveGlueMetastore.java</include>
<include>**/TestGlueHiveMetastore.java</include>
<include>**/TestGlueHiveMetastoreQueries.java</include>
<include>**/TestGlueHiveMetastoreSkipArchive.java</include>
<include>**/TestHiveGlueMetadataListing.java</include>
<include>**/TestHiveGlueMetastoreAccessOperations.java</include>
<include>**/TestHiveS3AndGlueMetastoreTest.java</include>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.glue;

import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;

public class GlueHiveExecutionInterceptor
implements ExecutionInterceptor
{
private final boolean skipArchive;

GlueHiveExecutionInterceptor(boolean isSkipArchive)
{
this.skipArchive = isSkipArchive;
}

@Override
public SdkRequest modifyRequest(Context.ModifyRequest context, ExecutionAttributes executionAttributes)
{
if (context.request() instanceof UpdateTableRequest updateTableRequest) {
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
return updateTableRequest.toBuilder().skipArchive(skipArchive).build();
}
return context.request();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class GlueHiveMetastoreConfig
private int partitionSegments = 5;
private int threads = 40;
private boolean assumeCanonicalPartitionKeys;
private boolean skipArchive;

public Optional<String> getGlueRegion()
{
Expand Down Expand Up @@ -277,4 +278,17 @@ public GlueHiveMetastoreConfig setAssumeCanonicalPartitionKeys(boolean assumeCan
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

public boolean isSkipArchive()
{
return skipArchive;
}

@Config("hive.metastore.glue.skip-archive")
@ConfigDescription("Skip archiving an old table version when updating a table in the Glue metastore")
public GlueHiveMetastoreConfig setSkipArchive(boolean skipArchive)
{
this.skipArchive = skipArchive;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public static GlueClient createGlueClient(GlueHiveMetastoreConfig config, OpenTe
.setCaptureExperimentalSpanAttributes(true)
.setRecordIndividualHttpError(true)
.build().newExecutionInterceptor())
.addExecutionInterceptor(new GlueHiveExecutionInterceptor(config.isSkipArchive()))
.retryStrategy(retryBuilder -> retryBuilder
.retryOnException(throwable -> throwable instanceof ConcurrentModificationException)
.backoffStrategy(BackoffStrategy.exponentialDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class GlueHiveMetastoreConfig
private int readStatisticsThreads = 5;
private int writeStatisticsThreads = 20;
private boolean assumeCanonicalPartitionKeys;
private boolean skipArchive;

public Optional<String> getGlueRegion()
{
Expand Down Expand Up @@ -276,19 +278,6 @@ public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads)
return this;
}

public boolean isAssumeCanonicalPartitionKeys()
{
return assumeCanonicalPartitionKeys;
}

@Config("hive.metastore.glue.assume-canonical-partition-keys")
@ConfigDescription("Allow conversion of non-char types (eg BIGINT, timestamp) to canonical string formats")
public GlueHiveMetastoreConfig setAssumeCanonicalPartitionKeys(boolean assumeCanonicalPartitionKeys)
{
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

@Min(1)
public int getReadStatisticsThreads()
{
Expand Down Expand Up @@ -317,6 +306,33 @@ public GlueHiveMetastoreConfig setWriteStatisticsThreads(int writeStatisticsThre
return this;
}

public boolean isAssumeCanonicalPartitionKeys()
{
return assumeCanonicalPartitionKeys;
}

@Config("hive.metastore.glue.assume-canonical-partition-keys")
@ConfigDescription("Allow conversion of non-char types (eg BIGINT, timestamp) to canonical string formats")
public GlueHiveMetastoreConfig setAssumeCanonicalPartitionKeys(boolean assumeCanonicalPartitionKeys)
{
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

public boolean isSkipArchive()
{
return skipArchive;
}

@Config("hive.metastore.glue.skip-archive")
@LegacyConfig("iceberg.glue.skip-archive")
@ConfigDescription("Skip archiving an old table version when updating a table in the Glue metastore")
public GlueHiveMetastoreConfig setSkipArchive(boolean skipArchive)
{
this.skipArchive = skipArchive;
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,18 @@ protected void setup(Binder binder)
@ProvidesIntoSet
@Singleton
@ForGlueHiveMetastore
public RequestHandler2 createRequestHandler(OpenTelemetry openTelemetry)
public RequestHandler2 createSkipArchiveRequestHandler(GlueHiveMetastoreConfig config)
{
if (!config.isSkipArchive()) {
return new RequestHandler2() {};
}
return new SkipArchiveRequestHandler();
}

@ProvidesIntoSet
@Singleton
@ForGlueHiveMetastore
public RequestHandler2 createTelemetryRequestHandler(OpenTelemetry openTelemetry)
{
return AwsSdkTelemetry.builder(openTelemetry)
.setCaptureExperimentalSpanAttributes(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.glue;
package io.trino.plugin.hive.metastore.glue.v1;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.RequestHandler2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void testDefaults()
.setCatalogId(null)
.setPartitionSegments(5)
.setThreads(40)
.setAssumeCanonicalPartitionKeys(false));
.setAssumeCanonicalPartitionKeys(false)
.setSkipArchive(false));
}

@Test
Expand All @@ -68,6 +69,7 @@ void testExplicitPropertyMapping()
.put("hive.metastore.glue.partitions-segments", "10")
.put("hive.metastore.glue.threads", "77")
.put("hive.metastore.glue.assume-canonical-partition-keys", "true")
.put("hive.metastore.glue.skip-archive", "true")
.buildOrThrow();

GlueHiveMetastoreConfig expected = new GlueHiveMetastoreConfig()
Expand All @@ -87,7 +89,8 @@ void testExplicitPropertyMapping()
.setCatalogId("0123456789")
.setPartitionSegments(10)
.setThreads(77)
.setAssumeCanonicalPartitionKeys(true);
.setAssumeCanonicalPartitionKeys(true)
.setSkipArchive(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.glue;

import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.TableVersion;

import java.util.List;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThat;

final class TestGlueHiveMetastoreSkipArchive
extends AbstractTestQueryFramework
{
private final String testSchema = "test_schema_" + randomNameSuffix();
private final GlueClient glueClient = GlueClient.create();

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
DistributedQueryRunner queryRunner = HiveQueryRunner.builder(testSessionBuilder()
.setCatalog("hive")
.setSchema(testSchema)
.build())
.addHiveProperty("hive.metastore", "glue")
.addHiveProperty("hive.metastore.glue.default-warehouse-dir", "local:///glue")
.addHiveProperty("hive.security", "allow-all")
.addHiveProperty("hive.metastore.glue.skip-archive", "true")
.setCreateTpchSchemas(false)
.build();
queryRunner.execute("CREATE SCHEMA " + testSchema);
return queryRunner;
}

@AfterAll
void cleanUpSchema()
{
getQueryRunner().execute("DROP SCHEMA " + testSchema + " CASCADE");
}

@Test
void testSkipArchive()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_skip_archive", "(col int)")) {
List<TableVersion> tableVersionsBeforeInsert = getTableVersions(testSchema, table.getName());
assertThat(tableVersionsBeforeInsert).hasSize(1);
String versionIdBeforeInsert = getOnlyElement(tableVersionsBeforeInsert).versionId();

assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1);

// Verify count of table versions isn't increased, but version id is changed
List<TableVersion> tableVersionsAfterInsert = getTableVersions(testSchema, table.getName());
assertThat(tableVersionsAfterInsert).hasSize(1);
String versionIdAfterInsert = getOnlyElement(tableVersionsAfterInsert).versionId();
assertThat(versionIdBeforeInsert).isNotEqualTo(versionIdAfterInsert);
}
}

private List<TableVersion> getTableVersions(String databaseName, String tableName)
{
return glueClient.getTableVersions(builder -> builder.databaseName(databaseName).tableName(tableName)).tableVersions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void testDefaults()
.setPartitionSegments(5)
.setGetPartitionThreads(20)
.setAssumeCanonicalPartitionKeys(false)
.setSkipArchive(false)
.setReadStatisticsThreads(5)
.setWriteStatisticsThreads(20));
}
Expand All @@ -72,6 +73,7 @@ public void testExplicitPropertyMapping()
.put("hive.metastore.glue.partitions-segments", "10")
.put("hive.metastore.glue.get-partition-threads", "42")
.put("hive.metastore.glue.assume-canonical-partition-keys", "true")
.put("hive.metastore.glue.skip-archive", "true")
.put("hive.metastore.glue.read-statistics-threads", "42")
.put("hive.metastore.glue.write-statistics-threads", "43")
.buildOrThrow();
Expand All @@ -95,6 +97,7 @@ public void testExplicitPropertyMapping()
.setPartitionSegments(10)
.setGetPartitionThreads(42)
.setAssumeCanonicalPartitionKeys(true)
.setSkipArchive(true)
.setReadStatisticsThreads(42)
.setWriteStatisticsThreads(43);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
package io.trino.plugin.iceberg.catalog.glue;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;

public class IcebergGlueCatalogConfig
{
private boolean cacheTableMetadata = true;
private boolean skipArchive = true;

public boolean isCacheTableMetadata()
{
Expand All @@ -32,17 +30,4 @@ public IcebergGlueCatalogConfig setCacheTableMetadata(boolean cacheTableMetadata
this.cacheTableMetadata = cacheTableMetadata;
return this;
}

public boolean isSkipArchive()
{
return skipArchive;
}

@Config("iceberg.glue.skip-archive")
nineinchnick marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("Skip archiving an old table version when creating a new version in a commit")
public IcebergGlueCatalogConfig setSkipArchive(boolean skipArchive)
{
this.skipArchive = skipArchive;
return this;
}
}
Loading