Skip to content

Commit

Permalink
Remove unnecessary Predicate interface in parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Dec 13, 2022
1 parent a660133 commit 8e4ff8f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 92 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static BigDecimal maximalValue(DecimalType decimalType)
return new BigDecimal(format("+%s.%s", "9".repeat(decimalType.getPrecision() - decimalType.getScale()), "9".repeat(decimalType.getScale())));
}

public static Predicate buildPredicate(
public static TupleDomainParquetPredicate buildPredicate(
MessageType requestedSchema,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -130,7 +130,7 @@ public static Predicate buildPredicate(
}

public static boolean predicateMatches(
Predicate parquetPredicate,
TupleDomainParquetPredicate parquetPredicate,
BlockMetaData block,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -150,7 +150,7 @@ public static boolean predicateMatches(
// Perform column index and dictionary lookups only for the subset of columns where it can be useful.
// This prevents unnecessary filesystem reads and decoding work when the predicate on a column comes from
// file-level min/max stats or more generally when the predicate selects a range equal to or wider than row-group min/max.
Predicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone);
TupleDomainParquetPredicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone);

// Page stats is finer grained but relatively more expensive, so we do the filtering after above block filtering.
if (columnIndexStore.isPresent() && !indexPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) {
Expand Down Expand Up @@ -182,7 +182,7 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
}

private static boolean dictionaryPredicatesMatch(
Predicate parquetPredicate,
TupleDomainParquetPredicate parquetPredicate,
BlockMetaData blockMetadata,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

public class TupleDomainParquetPredicate
implements Predicate
{
private final TupleDomain<ColumnDescriptor> effectivePredicate;
private final List<ColumnDescriptor> columns;
Expand All @@ -91,7 +90,21 @@ public TupleDomainParquetPredicate(TupleDomain<ColumnDescriptor> effectivePredic
this.timeZone = requireNonNull(timeZone, "timeZone is null");
}

@Override
/**
* Should the Parquet Reader process a file section with the specified statistics,
* and if it should, then return the columns are candidates for further inspection of more
* granular statistics from column index and dictionary.
*
* @param numberOfRows the number of rows in the segment; this can be used with
* Statistics to determine if a column is only null
* @param statistics column statistics
* @param id Parquet file name
*
* @return Optional.empty() if statistics were sufficient to eliminate the file section.
* Otherwise, a list of columns for which page-level indices and dictionary could be consulted
* to potentially eliminate the file section. An optional with empty list is returned if there is
* going to be no benefit in looking at column index or dictionary for any column.
*/
public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id)
throws ParquetCorruptionException
{
Expand Down Expand Up @@ -137,7 +150,13 @@ public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRo
return Optional.of(candidateColumns.build());
}

@Override
/**
* Should the Parquet Reader process a file section with the specified dictionary based on that
* single dictionary. This is safe to check repeatedly to avoid loading more parquet dictionaries
* if the section can already be eliminated.
*
* @param dictionary The single column dictionary
*/
public boolean matches(DictionaryDescriptor dictionary)
{
requireNonNull(dictionary, "dictionary is null");
Expand All @@ -152,7 +171,14 @@ public boolean matches(DictionaryDescriptor dictionary)
return effectivePredicateDomain == null || effectivePredicateMatches(effectivePredicateDomain, dictionary);
}

@Override
/**
* Should the Parquet Reader process a file section with the specified statistics.
*
* @param numberOfRows the number of rows in the segment; this can be used with
* Statistics to determine if a column is only null
* @param columnIndexStore column index (statistics) store
* @param id Parquet file name
*/
public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, ParquetDataSourceId id)
throws ParquetCorruptionException
{
Expand Down Expand Up @@ -189,7 +215,12 @@ public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, Par
return true;
}

@Override
/**
* Convert Predicate to Parquet filter if possible.
*
* @param timeZone current Parquet timezone
* @return Converted Parquet filter or null if conversion not possible
*/
public Optional<FilterPredicate> toParquetFilter(DateTimeZone timeZone)
{
return Optional.ofNullable(convertToParquetFilter(timeZone));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetWriteValidation;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.FilteredOffsetIndex.OffsetRange;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.spi.Page;
Expand Down Expand Up @@ -162,7 +162,7 @@ public ParquetReader(
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options,
Function<Exception, RuntimeException> exceptionTransform,
Optional<Predicate> parquetPredicate,
Optional<TupleDomainParquetPredicate> parquetPredicate,
List<Optional<ColumnIndexStore>> columnIndexStore,
Optional<ParquetWriteValidation> writeValidation)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetWriteValidation;
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.parquet.reader.ParquetReaderColumn;
Expand Down Expand Up @@ -223,7 +223,7 @@ public static ReaderPageSource createPageSource(
? TupleDomain.all()
: getParquetTupleDomain(descriptorsByPath, effectivePredicate, fileSchema, useColumnNames);

Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);
TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);

long nextStart = 0;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.parquet.reader.ParquetReaderColumn;
Expand Down Expand Up @@ -206,7 +206,7 @@ private static ConnectorPageSource createPageSource(
? TupleDomain.all()
: getParquetTupleDomain(descriptorsByPath, hudiSplit.getPredicate(), fileSchema, useColumnNames);

Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);
TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone);

long nextStart = 0;
ImmutableList.Builder<BlockMetaData> blocks = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.parquet.reader.ParquetReaderColumn;
Expand Down Expand Up @@ -973,7 +973,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
MessageType requestedSchema = new MessageType(fileSchema.getName(), parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList()));
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);
TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);

long nextStart = 0;
Optional<Long> startRowPosition = Optional.empty();
Expand Down

0 comments on commit 8e4ff8f

Please sign in to comment.