Skip to content

Commit

Permalink
Fix durability of INSERT in Iceberg
Browse files Browse the repository at this point in the history
Previously, no locking was applied when writing Iceberg data, thus in
case of concurrent writes (from same cluster, from multiple Trino
clusters, or from different applications) successfully committed data
could get made unreachable by a concurrent transaction's commit.

This behavior is illustrated with a test being added here. Before the
fix, the writes would always succeed, but the part of written data would
not be visible in the final table state.
  • Loading branch information
findepi committed Oct 15, 2021
1 parent b31b4a7 commit 005c74c
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Tab
return result;
}

static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table)
{
org.apache.hadoop.hive.metastore.api.Table result = new org.apache.hadoop.hive.metastore.api.Table();
result.setDbName(table.getDatabaseName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

// TODO: use metastore locking

Table table;
try {
Table currentTable = getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastore;
import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.TableNotFoundException;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
Expand All @@ -29,60 +31,77 @@

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static java.util.Objects.requireNonNull;

@NotThreadSafe
public class HiveMetastoreTableOperations
extends AbstractMetastoreTableOperations
{
private final ThriftMetastore thriftMetastore;

public HiveMetastoreTableOperations(
FileIO fileIo,
HiveMetastore metastore,
ThriftMetastore thriftMetastore,
ConnectorSession session,
String database,
String table,
Optional<String> owner,
Optional<String> location)
{
super(fileIo, metastore, session, database, table, owner, location);
this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null");
}

@Override
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);
HiveIdentity identity = new HiveIdentity(session);

// TODO: use metastore locking

Table table;
long lockId = thriftMetastore.acquireTableExclusiveLock(
identity,
session.getQueryId(),
database,
tableName);
try {
Table currentTable = getTable();
Table table;
try {
Table currentTable = fromMetastoreApiTable(thriftMetastore.getTable(identity, database, tableName)
.orElseThrow(() -> new TableNotFoundException(getSchemaTableName())));

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}
throw e;

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
metastore.replaceTable(identity, database, tableName, table, privileges);
}
finally {
thriftMetastore.releaseTableLock(identity, lockId);
}

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
metastore.replaceTable(identity, database, tableName, table, privileges);
shouldRefresh = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastore;
import io.trino.plugin.iceberg.FileIoProvider;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand All @@ -30,11 +31,13 @@ public class HiveMetastoreTableOperationsProvider
implements IcebergTableOperationsProvider
{
private final FileIoProvider fileIoProvider;
private final ThriftMetastore thriftMetastore;

@Inject
public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider)
public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider, ThriftMetastore thriftMetastore)
{
this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null");
this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null");
}

@Override
Expand All @@ -51,6 +54,7 @@ public IcebergTableOperations createTableOperations(
return new HiveMetastoreTableOperations(
fileIoProvider.createFileIo(hdfsContext, queryId),
hiveMetastore,
thriftMetastore,
session,
database,
table,
Expand Down
5 changes: 5 additions & 0 deletions testing/trino-product-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@
<artifactId>tempto-runner</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public QueryExecutor queryExecutor()
return onTrino();
}
},
SPARK {
@Override
public QueryExecutor queryExecutor()
{
return onTrino();
}
},
/**/;

public abstract QueryExecutor queryExecutor();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.iceberg;

import io.airlift.concurrent.MoreFutures;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.query.QueryExecutionException;
import io.trino.tempto.query.QueryExecutor;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.tests.product.TestGroups.HMS_ONLY;
import static io.trino.tests.product.TestGroups.ICEBERG;
import static io.trino.tests.product.TestGroups.STORAGE_FORMATS_DETAILED;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TestIcebergInsert
extends ProductTest
{
/**
* @see TestIcebergCreateTable#testCreateTable() See TestIcebergCreateTable for a non-concurrent INSERT test coverage.
* @see TestIcebergSparkCompatibility#testTrinoSparkConcurrentInsert()
*/
@Test(groups = {ICEBERG, STORAGE_FORMATS_DETAILED, HMS_ONLY}, timeOut = 60_000)
public void testIcebergConcurrentInsert()
throws Exception
{
int threads = 3;
int insertsPerThread = 7;

String tableName = "iceberg.default.test_insert_concurrent_" + randomTableSuffix();
onTrino().executeQuery("CREATE TABLE " + tableName + "(a bigint)");

ExecutorService executor = Executors.newFixedThreadPool(threads);
try {
CyclicBarrier barrier = new CyclicBarrier(threads);
QueryExecutor onTrino = onTrino();
List<Long> allInserted = executor.invokeAll(
IntStream.range(0, threads)
.mapToObj(thread -> (Callable<List<Long>>) () -> {
List<Long> inserted = new ArrayList<>();
for (int i = 0; i < insertsPerThread; i++) {
barrier.await(20, SECONDS);
long value = i + (long) insertsPerThread * thread;
try {
onTrino.executeQuery("INSERT INTO " + tableName + " VALUES " + value);
}
catch (QueryExecutionException queryExecutionException) {
// failed to insert
continue;
}
inserted.add(value);
}
return inserted;
})
.collect(toImmutableList())).stream()
.map(MoreFutures::getDone)
.flatMap(List::stream)
.collect(toImmutableList());

// At least one INSERT per round should succeed
Assertions.assertThat(allInserted).hasSizeBetween(insertsPerThread, threads * insertsPerThread);

assertThat(onTrino().executeQuery("SELECT * FROM " + tableName))
.containsOnly(allInserted.stream()
.map(QueryAssert.Row::row)
.toArray(QueryAssert.Row[]::new));

onTrino().executeQuery("DROP TABLE " + tableName);
}
finally {
executor.shutdownNow();
}
}
}
Loading

0 comments on commit 005c74c

Please sign in to comment.