Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Iceberg queries against v2 tables with row level deletes #8450

Merged
merged 2 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirec
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(
new NodeVersion("test_version"),
new NodeVersion("testversion"),
hdfsEnvironment,
new MetastoreConfig(),
new FileHiveMetastoreConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ConnectorSplitSource getSplits(

// TODO Use residual. Right now there is no way to propagate residual to Trino but at least we can
// propagate it at split level so the parquet pushdown can leverage it.
IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks());
IcebergSplitSource splitSource = new IcebergSplitSource(table.getSchemaTableName(), tableScan.planTasks());

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPartitionHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.io.CloseableIterable;
Expand All @@ -32,17 +34,20 @@

import static com.google.common.collect.Iterators.limit;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class IcebergSplitSource
implements ConnectorSplitSource
{
private final SchemaTableName schemaTableName;
private final CloseableIterable<CombinedScanTask> combinedScanIterable;
private final Iterator<FileScanTask> fileScanIterator;

public IcebergSplitSource(CloseableIterable<CombinedScanTask> combinedScanIterable)
public IcebergSplitSource(SchemaTableName schemaTableName, CloseableIterable<CombinedScanTask> combinedScanIterable)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.combinedScanIterable = requireNonNull(combinedScanIterable, "combinedScanIterable is null");

this.fileScanIterator = Streams.stream(combinedScanIterable)
Expand All @@ -59,6 +64,9 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
Iterator<FileScanTask> iterator = limit(fileScanIterator, maxSize);
while (iterator.hasNext()) {
FileScanTask task = iterator.next();
if (!task.deletes().isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + schemaTableName);
}
splits.add(toIcebergSplit(task));
}
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.trino.tpch.TpchTable;
import org.apache.iceberg.FileFormat;

import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.QueryAssertions.copyTpchTables;
Expand Down Expand Up @@ -60,6 +62,17 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String

public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String> extraProperties, FileFormat format, List<TpchTable<?>> tables)
throws Exception
{
return createIcebergQueryRunner(extraProperties, format, tables, Optional.empty());
}

public static DistributedQueryRunner createIcebergQueryRunner(
Map<String, String> extraProperties,
FileFormat format,
List<TpchTable<?>> tables,
Optional<File> metastoreDirectory)
throws Exception

{
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
Expand All @@ -73,7 +86,7 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map<String, String
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

Path dataDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data");
Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));

queryRunner.installPlugin(new IcebergPlugin());
Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfiguration;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Optional;
import java.util.UUID;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static io.trino.tpch.TpchTable.NATION;
import static org.apache.iceberg.FileFormat.ORC;

public class TestIcebergV2
extends AbstractTestQueryFramework
{
private HiveMetastore metastore;
private HdfsEnvironment hdfsEnvironment;
private File metastoreDir;
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
HdfsConfig config = new HdfsConfig();
HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of());
hdfsEnvironment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication());

File tempDir = Files.createTempDirectory("test_iceberg_v2").toFile();
metastoreDir = new File(tempDir, "iceberg_data");
metastore = createTestingFileHiveMetastore(metastoreDir);

return createIcebergQueryRunner(ImmutableMap.of(), ORC, ImmutableList.of(NATION), Optional.of(metastoreDir));
}

@AfterClass(alwaysRun = true)
public void tearDown()
throws IOException
{
deleteRecursively(metastoreDir.getParentFile().toPath(), ALLOW_INSECURE);
}

@Test
public void testV2TableRead()
{
String tableName = "test_v2_table_read" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
updateTableToV2(tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up, i would love to see a product test with Spark Iceberg for this too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add the version info in one of our system tables to make it easier to verify that Spark indeed writes v2.

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
public void testV2TableWithPositionDelete()
throws Exception
{
String tableName = "test_v2_row_delete" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTableToV2(tableName);

String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue();

Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir);

PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.overwrite()
.buildPositionWriter();

try (Closeable ignored = writer) {
writer.delete(dataFilePath, 0);
}

icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
assertQueryFails("SELECT * FROM " + tableName, "Iceberg tables with delete files are not supported: tpch." + tableName);
}

@Test
public void testV2TableWithEqualityDelete()
throws Exception
{
String tableName = "test_v2_equality_delete" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTableToV2(tableName);
writeEqualityDeleteToNationTable(icebergTable);
assertQueryFails("SELECT * FROM " + tableName, "Iceberg tables with delete files are not supported: tpch." + tableName);
}

@Test
public void testV2TableWithUnreadEqualityDelete()
throws Exception
{
String tableName = "test_v2_equality_delete" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT nationkey, regionkey FROM tpch.tiny.nation", 25);
Table icebergTable = updateTableToV2(tableName);
writeEqualityDeleteToNationTable(icebergTable);

assertUpdate("INSERT INTO " + tableName + " VALUES (100, 101)", 1);
assertQuery("SELECT regionkey FROM " + tableName + " WHERE nationkey = 100", "VALUES 101");
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir);

Schema deleteRowSchema = icebergTable.schema().select("regionkey");
EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.equalityFieldIds(deleteRowSchema.findField("regionkey").fieldId())
.overwrite()
.buildEqualityWriter();

Record dataDelete = GenericRecord.create(deleteRowSchema);
try (Closeable ignored = writer) {
writer.delete(dataDelete.copy("regionkey", 1L));
}

icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}

private Table updateTableToV2(String tableName)
{
HiveTableOperationsProvider tableOperationsProvider = new HiveTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), metastore);
BaseTable table = (BaseTable) loadIcebergTable(tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName));

TableOperations operations = table.operations();
TableMetadata currentMetadata = operations.current();
operations.commit(currentMetadata, currentMetadata.upgradeToFormatVersion(2));

return table;
}
}