diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java index 9d0f6076a145..2291fb157bc7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java @@ -173,7 +173,7 @@ public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Tab return result; } - static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table) + public static org.apache.hadoop.hive.metastore.api.Table toMetastoreApiTable(Table table) { org.apache.hadoop.hive.metastore.api.Table result = new org.apache.hadoop.hive.metastore.api.Table(); result.setDbName(table.getDatabaseName()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java index c1f8d02ce23a..36c4d9559eb8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -51,8 +51,6 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, version + 1); - // TODO: use metastore locking - Table table; try { Table currentTable = getTable(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java index 91825b384de5..2bb7d8b244a5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -17,8 +17,10 @@ import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.FileIO; @@ -29,14 +31,19 @@ import static com.google.common.base.Preconditions.checkState; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable; +import static java.util.Objects.requireNonNull; @NotThreadSafe public class HiveMetastoreTableOperations extends AbstractMetastoreTableOperations { + private final ThriftMetastore thriftMetastore; + public HiveMetastoreTableOperations( FileIO fileIo, HiveMetastore metastore, + ThriftMetastore thriftMetastore, ConnectorSession session, String database, String table, @@ -44,45 +51,57 @@ public HiveMetastoreTableOperations( Optional location) { super(fileIo, metastore, session, database, table, owner, location); + this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null"); } @Override protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, version + 1); + HiveIdentity identity = new HiveIdentity(session); - // TODO: use metastore locking - - Table table; + long lockId = thriftMetastore.acquireTableExclusiveLock( + identity, + session.getQueryId(), + database, + tableName); try { - Table currentTable = getTable(); + Table table; + try { + Table currentTable = fromMetastoreApiTable(thriftMetastore.getTable(identity, database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName()))); - checkState(currentMetadataLocation != null, "No current metadata location for existing table"); - String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); - if (!currentMetadataLocation.equals(metadataLocation)) { - throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", - currentMetadataLocation, metadataLocation, getSchemaTableName()); - } + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", + currentMetadataLocation, metadataLocation, getSchemaTableName()); + } - table = Table.builder(currentTable) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .setParameter(METADATA_LOCATION, newMetadataLocation) - .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) - .build(); - } - catch (RuntimeException e) { - try { - io().deleteFile(newMetadataLocation); + table = Table.builder(currentTable) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .setParameter(METADATA_LOCATION, newMetadataLocation) + .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) + .build(); } - catch (RuntimeException ex) { - e.addSuppressed(ex); + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException ex) { + e.addSuppressed(ex); + } + throw e; } - throw e; + + PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); + metastore.replaceTable(identity, database, tableName, table, privileges); + } + finally { + thriftMetastore.releaseTableLock(identity, lockId); } - PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner()); - HiveIdentity identity = new HiveIdentity(session); - metastore.replaceTable(identity, database, tableName, table, privileges); + shouldRefresh = true; } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java index 9b24302ef06e..5252bbec271e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java @@ -15,6 +15,7 @@ import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; import io.trino.plugin.iceberg.FileIoProvider; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; @@ -30,11 +31,13 @@ public class HiveMetastoreTableOperationsProvider implements IcebergTableOperationsProvider { private final FileIoProvider fileIoProvider; + private final ThriftMetastore thriftMetastore; @Inject - public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider) + public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider, ThriftMetastore thriftMetastore) { this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); + this.thriftMetastore = requireNonNull(thriftMetastore, "thriftMetastore is null"); } @Override @@ -51,6 +54,7 @@ public IcebergTableOperations createTableOperations( return new HiveMetastoreTableOperations( fileIoProvider.createFileIo(hdfsContext, queryId), hiveMetastore, + thriftMetastore, session, database, table, diff --git a/testing/trino-product-tests/pom.xml b/testing/trino-product-tests/pom.xml index d40bce87cd29..cf9cc7c45ce3 100644 --- a/testing/trino-product-tests/pom.xml +++ b/testing/trino-product-tests/pom.xml @@ -76,6 +76,11 @@ tempto-runner + + io.airlift + concurrent + + io.airlift http-client diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/Engine.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/Engine.java index ae2d31eb66df..a4b6fc10170b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/Engine.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/Engine.java @@ -34,6 +34,13 @@ public QueryExecutor queryExecutor() return onTrino(); } }, + SPARK { + @Override + public QueryExecutor queryExecutor() + { + return onTrino(); + } + }, /**/; public abstract QueryExecutor queryExecutor(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java new file mode 100644 index 000000000000..a94ca440ce3f --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergInsert.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.iceberg; + +import io.airlift.concurrent.MoreFutures; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryExecutionException; +import io.trino.tempto.query.QueryExecutor; +import org.assertj.core.api.Assertions; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tests.product.TestGroups.HMS_ONLY; +import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.STORAGE_FORMATS_DETAILED; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class TestIcebergInsert + extends ProductTest +{ + /** + * @see TestIcebergCreateTable#testCreateTable() See TestIcebergCreateTable for a non-concurrent INSERT test coverage. + * @see TestIcebergSparkCompatibility#testTrinoSparkConcurrentInsert() + */ + @Test(groups = {ICEBERG, STORAGE_FORMATS_DETAILED, HMS_ONLY}, timeOut = 60_000) + public void testIcebergConcurrentInsert() + throws Exception + { + int threads = 3; + int insertsPerThread = 7; + + String tableName = "iceberg.default.test_insert_concurrent_" + randomTableSuffix(); + onTrino().executeQuery("CREATE TABLE " + tableName + "(a bigint)"); + + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + QueryExecutor onTrino = onTrino(); + List allInserted = executor.invokeAll( + IntStream.range(0, threads) + .mapToObj(thread -> (Callable>) () -> { + List inserted = new ArrayList<>(); + for (int i = 0; i < insertsPerThread; i++) { + barrier.await(20, SECONDS); + long value = i + (long) insertsPerThread * thread; + try { + onTrino.executeQuery("INSERT INTO " + tableName + " VALUES " + value); + } + catch (QueryExecutionException queryExecutionException) { + // failed to insert + continue; + } + inserted.add(value); + } + return inserted; + }) + .collect(toImmutableList())).stream() + .map(MoreFutures::getDone) + .flatMap(List::stream) + .collect(toImmutableList()); + + // At least one INSERT per round should succeed + Assertions.assertThat(allInserted).hasSizeBetween(insertsPerThread, threads * insertsPerThread); + + assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)) + .containsOnly(allInserted.stream() + .map(QueryAssert.Row::row) + .toArray(QueryAssert.Row[]::new)); + + onTrino().executeQuery("DROP TABLE " + tableName); + } + finally { + executor.shutdownNow(); + } + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 1b233c7ec362..745b68c12c13 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -15,8 +15,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; +import io.airlift.concurrent.MoreFutures; import io.trino.tempto.ProductTest; +import io.trino.tempto.query.QueryExecutionException; +import io.trino.tempto.query.QueryExecutor; import io.trino.tempto.query.QueryResult; +import io.trino.tests.product.hive.Engine; +import org.assertj.core.api.Assertions; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -24,7 +29,12 @@ import java.sql.Date; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -36,6 +46,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.ICEBERG; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AND_INSERT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AS_SELECT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_WITH_NO_DATA_AND_INSERT; @@ -43,6 +54,8 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; import static java.util.Arrays.asList; +import static java.util.Locale.ENGLISH; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.assertTrue; public class TestIcebergSparkCompatibility @@ -129,7 +142,7 @@ public void testTrinoReadingSparkData(StorageFormat storageFormat) true, Timestamp.valueOf("2020-06-28 14:16:00.456"), Date.valueOf("1950-06-28"), - new byte[]{00, 01, 02, -16, -2, -1}); + new byte[] {00, 01, 02, -16, -2, -1}); assertThat(onSpark().executeQuery( "SELECT " + @@ -238,7 +251,7 @@ public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode cr //"2020-06-28 14:16:00.456", "2021-08-03 06:32:21.123456 UTC", // Iceberg's timestamptz stores point in time, without zone "1950-06-28", - new byte[]{00, 01, 02, -16, -2, -1} + new byte[] {00, 01, 02, -16, -2, -1} // "01:23:45.123456" /**/); assertThat(onTrino().executeQuery( @@ -780,6 +793,76 @@ private void assertSelectsOnSpecialCharacters(String trinoTableName, String spar } } + /** + * @see TestIcebergInsert#testIcebergConcurrentInsert() + */ + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, timeOut = 60_000) + public void testTrinoSparkConcurrentInsert() + throws Exception + { + int insertsPerEngine = 7; + + String baseTableName = "trino_spark_insert_concurrent_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(e varchar, a bigint)"); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + CyclicBarrier barrier = new CyclicBarrier(2); + QueryExecutor onTrino = onTrino(); + QueryExecutor onSpark = onSpark(); + List allInserted = executor.invokeAll( + Stream.of(Engine.TRINO, Engine.SPARK) + .map(engine -> (Callable>) () -> { + List inserted = new ArrayList<>(); + for (int i = 0; i < insertsPerEngine; i++) { + barrier.await(20, SECONDS); + String engineName = engine.name().toLowerCase(ENGLISH); + long value = i; + switch (engine) { + case TRINO: + try { + onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value)); + } + catch (QueryExecutionException queryExecutionException) { + // failed to insert + continue; // next loop iteration + } + break; + case SPARK: + onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value)); + break; + default: + throw new UnsupportedOperationException("Unexpected engine: " + engine); + } + + inserted.add(row(engineName, value)); + } + return inserted; + }) + .collect(toImmutableList())).stream() + .map(MoreFutures::getDone) + .flatMap(List::stream) + .collect(toImmutableList()); + + // At least one INSERT per round should succeed + Assertions.assertThat(allInserted).hasSizeBetween(insertsPerEngine, insertsPerEngine * 2); + + // All Spark inserts should succeed (and not be obliterated) + assertThat(onTrino().executeQuery("SELECT count(*) FROM " + trinoTableName + " WHERE e = 'spark'")) + .containsOnly(row(insertsPerEngine)); + + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)) + .containsOnly(allInserted); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + finally { + executor.shutdownNow(); + } + } + private static String escapeSparkString(String value) { return value.replace("\\", "\\\\").replace("'", "\\'");