Skip to content

Commit

Permalink
Add Optional predicate to ParquetReader constructor used in Iceberg
Browse files Browse the repository at this point in the history
Add requireNonNull on parquetPredicate and columnIndexStore in ParquetReader,
pass Optional.empty() from Iceberg for parquetPredicate param
  • Loading branch information
charlesjmorgan authored and raunaqmorarka committed Sep 7, 2022
1 parent df4f4c8 commit c72e3ba
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;

public class ParquetReader
Expand Down Expand Up @@ -130,10 +131,11 @@ public ParquetReader(
ParquetDataSource dataSource,
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options)
ParquetReaderOptions options,
Optional<Predicate> parquetPredicate)
throws IOException
{
this(fileCreatedBy, fields, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, null, null);
this(fileCreatedBy, fields, blocks, firstRowsOfBlocks, dataSource, timeZone, memoryContext, options, parquetPredicate, nCopies(blocks.size(), Optional.empty()));
}

public ParquetReader(
Expand All @@ -145,7 +147,7 @@ public ParquetReader(
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options,
Predicate parquetPredicate,
Optional<Predicate> parquetPredicate,
List<Optional<ColumnIndexStore>> columnIndexStore)
throws IOException
{
Expand All @@ -164,14 +166,16 @@ public ParquetReader(

checkArgument(blocks.size() == firstRowsOfBlocks.size(), "elements of firstRowsOfBlocks must correspond to blocks");

this.columnIndexStore = columnIndexStore;
this.blockRowRanges = listWithNulls(this.blocks.size());
for (PrimitiveField field : primitiveFields) {
ColumnDescriptor columnDescriptor = field.getDescriptor();
this.paths.put(ColumnPath.get(columnDescriptor.getPath()), columnDescriptor);
}
if (parquetPredicate != null && options.isUseColumnIndex()) {
this.filter = parquetPredicate.toParquetFilter(timeZone);

requireNonNull(parquetPredicate, "parquetPredicate is null");
this.columnIndexStore = requireNonNull(columnIndexStore, "columnIndexStore is null");
if (parquetPredicate.isPresent() && options.isUseColumnIndex()) {
this.filter = parquetPredicate.get().toParquetFilter(timeZone);
}
else {
this.filter = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
timeZone,
newSimpleAggregatedMemoryContext(),
options,
parquetPredicate,
Optional.of(parquetPredicate),
columnIndexes.build());

ConnectorPageSource parquetPageSource = new ParquetPageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,8 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
dataSource,
UTC,
memoryContext,
options);
options,
Optional.empty());

return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
Expand Down

0 comments on commit c72e3ba

Please sign in to comment.