From fa8248e27ae737f3a2705953c1d8f4926078c89e Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jul 2021 12:00:59 -0400 Subject: [PATCH 1/2] Fix FileHiveMetastore test version to match TestingTrinoServer If FileHiveMetastore#createTestingFileHiveMetastore is used along with TestingTrinoServer the version check would fail. --- .../io/trino/plugin/hive/metastore/file/FileHiveMetastore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index 56d2eb401ce4..5989bac8ed53 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -158,7 +158,7 @@ public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirec HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); return new FileHiveMetastore( - new NodeVersion("test_version"), + new NodeVersion("testversion"), hdfsEnvironment, new MetastoreConfig(), new FileHiveMetastoreConfig() From 2295b40361c6fc0e8208c52175f3f9b5321abf01 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 1 Jul 2021 11:56:53 -0400 Subject: [PATCH 2/2] Fail Iceberg queries against v2 tables with row level deletes The v2 specification is not final but some writers are already adding support for it. For now, ensure that any tables with the new row level delete format cannot be queried. --- .../plugin/iceberg/IcebergSplitManager.java | 2 +- .../plugin/iceberg/IcebergSplitSource.java | 10 +- .../plugin/iceberg/IcebergQueryRunner.java | 15 +- .../trino/plugin/iceberg/TestIcebergV2.java | 188 ++++++++++++++++++ 4 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 7efd9a93d3dd..e30221b205f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -71,7 +71,7 @@ public ConnectorSplitSource getSplits( // TODO Use residual. Right now there is no way to propagate residual to Trino but at least we can // propagate it at split level so the parquet pushdown can leverage it. - IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks()); + IcebergSplitSource splitSource = new IcebergSplitSource(table.getSchemaTableName(), tableScan.planTasks()); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index eff0b1175c7d..136a23282fc2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -15,9 +15,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPartitionHandle; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.SchemaTableName; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.io.CloseableIterable; @@ -32,17 +34,20 @@ import static com.google.common.collect.Iterators.limit; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; public class IcebergSplitSource implements ConnectorSplitSource { + private final SchemaTableName schemaTableName; private final CloseableIterable combinedScanIterable; private final Iterator fileScanIterator; - public IcebergSplitSource(CloseableIterable combinedScanIterable) + public IcebergSplitSource(SchemaTableName schemaTableName, CloseableIterable combinedScanIterable) { + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.combinedScanIterable = requireNonNull(combinedScanIterable, "combinedScanIterable is null"); this.fileScanIterator = Streams.stream(combinedScanIterable) @@ -59,6 +64,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan Iterator iterator = limit(fileScanIterator, maxSize); while (iterator.hasNext()) { FileScanTask task = iterator.next(); + if (!task.deletes().isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + schemaTableName); + } splits.add(toIcebergSplit(task)); } return completedFuture(new ConnectorSplitBatch(splits, isFinished())); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 502750cf6029..6b3245c75c23 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -22,9 +22,11 @@ import io.trino.tpch.TpchTable; import org.apache.iceberg.FileFormat; +import java.io.File; import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; @@ -60,6 +62,17 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties, FileFormat format, List> tables) throws Exception + { + return createIcebergQueryRunner(extraProperties, format, tables, Optional.empty()); + } + + public static DistributedQueryRunner createIcebergQueryRunner( + Map extraProperties, + FileFormat format, + List> tables, + Optional metastoreDirectory) + throws Exception + { Session session = testSessionBuilder() .setCatalog(ICEBERG_CATALOG) @@ -73,7 +86,7 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); queryRunner.installPlugin(new IcebergPlugin()); Map icebergProperties = ImmutableMap.builder() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java new file mode 100644 index 000000000000..3fa1f7428564 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -0,0 +1,188 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfiguration; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; +import java.util.UUID; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static io.trino.tpch.TpchTable.NATION; +import static org.apache.iceberg.FileFormat.ORC; + +public class TestIcebergV2 + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private HdfsEnvironment hdfsEnvironment; + private File metastoreDir; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + HdfsConfig config = new HdfsConfig(); + HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of()); + hdfsEnvironment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication()); + + File tempDir = Files.createTempDirectory("test_iceberg_v2").toFile(); + metastoreDir = new File(tempDir, "iceberg_data"); + metastore = createTestingFileHiveMetastore(metastoreDir); + + return createIcebergQueryRunner(ImmutableMap.of(), ORC, ImmutableList.of(NATION), Optional.of(metastoreDir)); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.getParentFile().toPath(), ALLOW_INSECURE); + } + + @Test + public void testV2TableRead() + { + String tableName = "test_v2_table_read" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + updateTableToV2(tableName); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); + } + + @Test + public void testV2TableWithPositionDelete() + throws Exception + { + String tableName = "test_v2_row_delete" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = updateTableToV2(tableName); + + String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue(); + + Path metadataDir = new Path(metastoreDir.toURI()); + String deleteFileName = "delete_file_" + UUID.randomUUID(); + FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir); + + PositionDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs)) + .forTable(icebergTable) + .overwrite() + .buildPositionWriter(); + + try (Closeable ignored = writer) { + writer.delete(dataFilePath, 0); + } + + icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); + assertQueryFails("SELECT * FROM " + tableName, "Iceberg tables with delete files are not supported: tpch." + tableName); + } + + @Test + public void testV2TableWithEqualityDelete() + throws Exception + { + String tableName = "test_v2_equality_delete" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = updateTableToV2(tableName); + writeEqualityDeleteToNationTable(icebergTable); + assertQueryFails("SELECT * FROM " + tableName, "Iceberg tables with delete files are not supported: tpch." + tableName); + } + + @Test + public void testV2TableWithUnreadEqualityDelete() + throws Exception + { + String tableName = "test_v2_equality_delete" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT nationkey, regionkey FROM tpch.tiny.nation", 25); + Table icebergTable = updateTableToV2(tableName); + writeEqualityDeleteToNationTable(icebergTable); + + assertUpdate("INSERT INTO " + tableName + " VALUES (100, 101)", 1); + assertQuery("SELECT regionkey FROM " + tableName + " WHERE nationkey = 100", "VALUES 101"); + } + + private void writeEqualityDeleteToNationTable(Table icebergTable) + throws Exception + { + Path metadataDir = new Path(metastoreDir.toURI()); + String deleteFileName = "delete_file_" + UUID.randomUUID(); + FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir); + + Schema deleteRowSchema = icebergTable.schema().select("regionkey"); + EqualityDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs)) + .forTable(icebergTable) + .rowSchema(deleteRowSchema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .equalityFieldIds(deleteRowSchema.findField("regionkey").fieldId()) + .overwrite() + .buildEqualityWriter(); + + Record dataDelete = GenericRecord.create(deleteRowSchema); + try (Closeable ignored = writer) { + writer.delete(dataDelete.copy("regionkey", 1L)); + } + + icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); + } + + private Table updateTableToV2(String tableName) + { + HiveTableOperationsProvider tableOperationsProvider = new HiveTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), metastore); + BaseTable table = (BaseTable) loadIcebergTable(tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); + + TableOperations operations = table.operations(); + TableMetadata currentMetadata = operations.current(); + operations.commit(currentMetadata, currentMetadata.upgradeToFormatVersion(2)); + + return table; + } +}