diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergIndex.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergIndex.java index a2b5cfb17dc1..fe3a402c92c0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergIndex.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergIndex.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.trino.operator.OperatorStats; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -25,10 +26,15 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.spi.QueryId; import io.trino.spi.connector.SchemaTableName; +import io.trino.sql.planner.optimizations.PlanNodeSearcher; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.TableScanNode; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedRow; +import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; +import io.trino.testing.ResultWithQueryId; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CorrelatedColumns; import org.apache.iceberg.DataFile; @@ -45,6 +51,7 @@ import org.apache.iceberg.index.Index; import org.apache.iceberg.index.IndexFactory; import org.apache.iceberg.index.IndexWriter; +import org.apache.iceberg.index.IndexWriterResult; import org.apache.iceberg.index.util.IndexUtils; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; @@ -58,6 +65,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; @@ -65,6 +73,7 @@ import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static io.trino.testing.TestingConnectorSession.SESSION; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; /** * Test reading iceberg tables with indices. @@ -96,7 +105,7 @@ protected QueryRunner createQueryRunner() this.metastore = createTestingFileHiveMetastore(metastoreDir); return createIcebergQueryRunner( - ImmutableMap.of(), + ImmutableMap.of("enable-dynamic-filtering", "false"), ImmutableMap.of("iceberg.read-indices-switch-on", "true"), Collections.emptyList(), Optional.of(metastoreDir)); @@ -125,8 +134,7 @@ public void testFieldID() new Object[] {1, 2}, new Object[] {3, 4}, new Object[] {5, 6} - }, - table.currentSnapshot().snapshotId()); + }); // only select y from the table, to make sure we're using the correct index to filter the data assertEquals(getQueryRunner().execute("select y from foo where y=2").getMaterializedRows().size(), 1); @@ -139,7 +147,7 @@ public void testCorrColFilter() getQueryRunner().execute("create table fact(f_k int,v int)"); getQueryRunner().execute("create table dim(d_k int,v double)"); getQueryRunner().execute("insert into fact values (1,1),(1,2),(3,3)"); - getQueryRunner().execute("insert into dim values (1,1.0),(2,2.0),(3,3.0)"); + getQueryRunner().execute("insert into dim values (1,1.0),(2,2.0),(3,3.0),(4,4.0)"); Table fact = loadIcebergTable("fact"); // alter table fact add correlated column (v as dim_v) from inner join dim on f_k=d_k with pk_fk CorrelatedColumns.Builder corrColBuilder = new CorrelatedColumns.Builder(fact.schema(), false) @@ -163,13 +171,39 @@ public void testCorrColFilter() new Object[] {1, 1, 1.0}, new Object[] {1, 2, 1.0}, new Object[] {3, 3, 3.0} - }, - fact.currentSnapshot().snapshotId()); + }); fact.refresh(); - List results = getQueryRunner().execute("select sum(fact.v) from fact join dim on f_k=d_k where dim.v=5.0 group by f_k").getMaterializedRows(); - // TODO: verify split is skipped by index - assertEquals(results.size(), 0); + ResultWithQueryId resultWithId = getDistributedQueryRunner().executeWithQueryId( + getSession(), "select sum(fact.v) from fact join dim on f_k=d_k where dim.v=5.0 group by f_k"); + assertEquals(resultWithId.getResult().getMaterializedRows().size(), 0); + QueryId queryId = resultWithId.getQueryId(); + List operatorStats = getTableScanStats(queryId, "fact"); + assertEquals(operatorStats.size(), 1); + assertEquals(operatorStats.get(0).getOutputPositions(), 0L); + } + + private List getTableScanStats(QueryId queryId, String tableName) + { + List operatorStats = + getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats().getOperatorSummaries(); + List allTSStats = operatorStats.stream() + .filter(x -> x.getOperatorType().equals("ScanFilterAndProjectOperator")).collect(Collectors.toList()); + PlanNode root = getDistributedQueryRunner().getQueryPlan(queryId).getRoot(); + List res = new ArrayList<>(); + for (OperatorStats stats : allTSStats) { + PlanNode start = PlanNodeSearcher.searchFrom(root).where(n -> n.getId().equals(stats.getPlanNodeId())).findOnlyElement(null); + assertNotNull(start, String.format("PlanNodeId (%s) not found in query plan", stats.getPlanNodeId())); + Optional tableScan = PlanNodeSearcher.searchFrom(start).where(node -> { + if (node instanceof TableScanNode) { + TableScanNode ts = (TableScanNode) node; + return ((IcebergTableHandle) ts.getTable().getConnectorHandle()).getTableName().equals(tableName); + } + return false; + }).findSingle(); + tableScan.ifPresent(ignore -> res.add(stats)); + } + return res; } private Table loadIcebergTable(String tableName) @@ -178,18 +212,18 @@ private Table loadIcebergTable(String tableName) SESSION, new SchemaTableName("tpch", tableName)); } - private void generateIndexFiles(Table table, FileScanTask task, Object[][] rows, long snapshotId) + private void generateIndexFiles(Table table, FileScanTask task, Object[][] rows) throws Exception { Path indexRootPath = new Path(table.location(), "index"); - DataFile newDataFile = generateIndexFiles(table, task, indexRootPath, rows, snapshotId); + DataFile newDataFile = generateIndexFiles(table, task, indexRootPath, rows); RewriteFiles rewriteFiles = table.newRewrite(); rewriteFiles.rewriteFiles(Sets.newHashSet(task.file()), Sets.newHashSet(newDataFile)); rewriteFiles.commit(); } // generate all required index files for a source file, and return the updated DataFile - private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRootPath, Object[][] rows, long snapshotId) + private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRootPath, Object[][] rows) throws Exception { DataFile sourceFile = task.file(); @@ -204,7 +238,8 @@ private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRo for (Object[] row : rows) { indexWriter.addData(row[ordinal]); } - IndexFile indexFile = new IndexFile(indexField.indexId(), false, indexPath.toString()); + IndexWriterResult writerResult = indexWriter.finish(); + IndexFile indexFile = new IndexFile(indexField.indexId(), writerResult.isInPlace(), writerResult.getIndexData()); indexFiles.add(indexFile); } return DataFiles.builder(table.spec()).copy(sourceFile).withIndexFile(indexFiles).build();