Skip to content

Commit

Permalink
[trinodb#46] Use operator stats to verify data skipped by index
Browse files Browse the repository at this point in the history
  • Loading branch information
李锐 committed Sep 15, 2022
1 parent 162748f commit c86d29b
Showing 1 changed file with 49 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.trino.operator.OperatorStats;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfiguration;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
Expand All @@ -25,10 +26,15 @@
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.spi.QueryId;
import io.trino.spi.connector.SchemaTableName;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedRow;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CorrelatedColumns;
import org.apache.iceberg.DataFile;
Expand All @@ -45,6 +51,7 @@
import org.apache.iceberg.index.Index;
import org.apache.iceberg.index.IndexFactory;
import org.apache.iceberg.index.IndexWriter;
import org.apache.iceberg.index.IndexWriterResult;
import org.apache.iceberg.index.util.IndexUtils;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
Expand All @@ -58,13 +65,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

/**
* Test reading iceberg tables with indices.
Expand Down Expand Up @@ -96,7 +105,7 @@ protected QueryRunner createQueryRunner()
this.metastore = createTestingFileHiveMetastore(metastoreDir);

return createIcebergQueryRunner(
ImmutableMap.of(),
ImmutableMap.of("enable-dynamic-filtering", "false"),
ImmutableMap.of("iceberg.read-indices-switch-on", "true"),
Collections.emptyList(),
Optional.of(metastoreDir));
Expand Down Expand Up @@ -125,8 +134,7 @@ public void testFieldID()
new Object[] {1, 2},
new Object[] {3, 4},
new Object[] {5, 6}
},
table.currentSnapshot().snapshotId());
});

// only select y from the table, to make sure we're using the correct index to filter the data
assertEquals(getQueryRunner().execute("select y from foo where y=2").getMaterializedRows().size(), 1);
Expand All @@ -139,7 +147,7 @@ public void testCorrColFilter()
getQueryRunner().execute("create table fact(f_k int,v int)");
getQueryRunner().execute("create table dim(d_k int,v double)");
getQueryRunner().execute("insert into fact values (1,1),(1,2),(3,3)");
getQueryRunner().execute("insert into dim values (1,1.0),(2,2.0),(3,3.0)");
getQueryRunner().execute("insert into dim values (1,1.0),(2,2.0),(3,3.0),(4,4.0)");
Table fact = loadIcebergTable("fact");
// alter table fact add correlated column (v as dim_v) from inner join dim on f_k=d_k with pk_fk
CorrelatedColumns.Builder corrColBuilder = new CorrelatedColumns.Builder(fact.schema(), false)
Expand All @@ -163,13 +171,39 @@ public void testCorrColFilter()
new Object[] {1, 1, 1.0},
new Object[] {1, 2, 1.0},
new Object[] {3, 3, 3.0}
},
fact.currentSnapshot().snapshotId());
});
fact.refresh();

List<MaterializedRow> results = getQueryRunner().execute("select sum(fact.v) from fact join dim on f_k=d_k where dim.v=5.0 group by f_k").getMaterializedRows();
// TODO: verify split is skipped by index
assertEquals(results.size(), 0);
ResultWithQueryId<MaterializedResult> resultWithId = getDistributedQueryRunner().executeWithQueryId(
getSession(), "select sum(fact.v) from fact join dim on f_k=d_k where dim.v=5.0 group by f_k");
assertEquals(resultWithId.getResult().getMaterializedRows().size(), 0);
QueryId queryId = resultWithId.getQueryId();
List<OperatorStats> operatorStats = getTableScanStats(queryId, "fact");
assertEquals(operatorStats.size(), 1);
assertEquals(operatorStats.get(0).getOutputPositions(), 0L);
}

private List<OperatorStats> getTableScanStats(QueryId queryId, String tableName)
{
List<OperatorStats> operatorStats =
getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats().getOperatorSummaries();
List<OperatorStats> allTSStats = operatorStats.stream()
.filter(x -> x.getOperatorType().equals("ScanFilterAndProjectOperator")).collect(Collectors.toList());
PlanNode root = getDistributedQueryRunner().getQueryPlan(queryId).getRoot();
List<OperatorStats> res = new ArrayList<>();
for (OperatorStats stats : allTSStats) {
PlanNode start = PlanNodeSearcher.searchFrom(root).where(n -> n.getId().equals(stats.getPlanNodeId())).findOnlyElement(null);
assertNotNull(start, String.format("PlanNodeId (%s) not found in query plan", stats.getPlanNodeId()));
Optional<PlanNode> tableScan = PlanNodeSearcher.searchFrom(start).where(node -> {
if (node instanceof TableScanNode) {
TableScanNode ts = (TableScanNode) node;
return ((IcebergTableHandle) ts.getTable().getConnectorHandle()).getTableName().equals(tableName);
}
return false;
}).findSingle();
tableScan.ifPresent(ignore -> res.add(stats));
}
return res;
}

private Table loadIcebergTable(String tableName)
Expand All @@ -178,18 +212,18 @@ private Table loadIcebergTable(String tableName)
SESSION, new SchemaTableName("tpch", tableName));
}

private void generateIndexFiles(Table table, FileScanTask task, Object[][] rows, long snapshotId)
private void generateIndexFiles(Table table, FileScanTask task, Object[][] rows)
throws Exception
{
Path indexRootPath = new Path(table.location(), "index");
DataFile newDataFile = generateIndexFiles(table, task, indexRootPath, rows, snapshotId);
DataFile newDataFile = generateIndexFiles(table, task, indexRootPath, rows);
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(task.file()), Sets.newHashSet(newDataFile));
rewriteFiles.commit();
}

// generate all required index files for a source file, and return the updated DataFile
private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRootPath, Object[][] rows, long snapshotId)
private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRootPath, Object[][] rows)
throws Exception
{
DataFile sourceFile = task.file();
Expand All @@ -204,7 +238,8 @@ private DataFile generateIndexFiles(Table table, FileScanTask task, Path indexRo
for (Object[] row : rows) {
indexWriter.addData(row[ordinal]);
}
IndexFile indexFile = new IndexFile(indexField.indexId(), false, indexPath.toString());
IndexWriterResult writerResult = indexWriter.finish();
IndexFile indexFile = new IndexFile(indexField.indexId(), writerResult.isInPlace(), writerResult.getIndexData());
indexFiles.add(indexFile);
}
return DataFiles.builder(table.spec()).copy(sourceFile).withIndexFile(indexFiles).build();
Expand Down

0 comments on commit c86d29b

Please sign in to comment.