Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid relying on row-group row count for detecting only-null domain #15388

Merged
merged 2 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static BigDecimal maximalValue(DecimalType decimalType)
return new BigDecimal(format("+%s.%s", "9".repeat(decimalType.getPrecision() - decimalType.getScale()), "9".repeat(decimalType.getScale())));
}

public static Predicate buildPredicate(
public static TupleDomainParquetPredicate buildPredicate(
MessageType requestedSchema,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -130,7 +130,7 @@ public static Predicate buildPredicate(
}

public static boolean predicateMatches(
Predicate parquetPredicate,
TupleDomainParquetPredicate parquetPredicate,
BlockMetaData block,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -139,8 +139,12 @@ public static boolean predicateMatches(
DateTimeZone timeZone)
throws IOException
{
if (block.getRowCount() == 0) {
return false;
}
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
Optional<List<ColumnDescriptor>> candidateColumns = parquetPredicate.getIndexLookupCandidates(block.getRowCount(), columnStatistics, dataSource.getId());
Map<ColumnDescriptor, Long> columnValueCounts = getColumnValueCounts(block, descriptorsByPath);
Optional<List<ColumnDescriptor>> candidateColumns = parquetPredicate.getIndexLookupCandidates(columnValueCounts, columnStatistics, dataSource.getId());
if (candidateColumns.isEmpty()) {
return false;
}
Expand All @@ -150,10 +154,10 @@ public static boolean predicateMatches(
// Perform column index and dictionary lookups only for the subset of columns where it can be useful.
// This prevents unnecessary filesystem reads and decoding work when the predicate on a column comes from
// file-level min/max stats or more generally when the predicate selects a range equal to or wider than row-group min/max.
Predicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone);
TupleDomainParquetPredicate indexPredicate = new TupleDomainParquetPredicate(parquetTupleDomain, candidateColumns.get(), timeZone);

// Page stats is finer grained but relatively more expensive, so we do the filtering after above block filtering.
if (columnIndexStore.isPresent() && !indexPredicate.matches(block.getRowCount(), columnIndexStore.get(), dataSource.getId())) {
if (columnIndexStore.isPresent() && !indexPredicate.matches(columnValueCounts, columnIndexStore.get(), dataSource.getId())) {
return false;
}

Expand Down Expand Up @@ -181,8 +185,20 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
return statistics.buildOrThrow();
}

private static Map<ColumnDescriptor, Long> getColumnValueCounts(BlockMetaData blockMetadata, Map<List<String>, ColumnDescriptor> descriptorsByPath)
{
ImmutableMap.Builder<ColumnDescriptor, Long> columnValueCounts = ImmutableMap.builder();
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
if (descriptor != null) {
columnValueCounts.put(descriptor, columnMetaData.getValueCount());
}
}
return columnValueCounts.buildOrThrow();
}

private static boolean dictionaryPredicatesMatch(
Predicate parquetPredicate,
TupleDomainParquetPredicate parquetPredicate,
BlockMetaData blockMetadata,
ParquetDataSource dataSource,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

public class TupleDomainParquetPredicate
implements Predicate
{
private final TupleDomain<ColumnDescriptor> effectivePredicate;
private final List<ColumnDescriptor> columns;
Expand All @@ -91,13 +90,27 @@ public TupleDomainParquetPredicate(TupleDomain<ColumnDescriptor> effectivePredic
this.timeZone = requireNonNull(timeZone, "timeZone is null");
}

@Override
public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> statistics, ParquetDataSourceId id)
/**
* Should the Parquet Reader process a file section with the specified statistics,
* and if it should, then return the columns are candidates for further inspection of more
* granular statistics from column index and dictionary.
*
* @param valueCounts the number of values for a column in the segment; this can be used with
* Statistics to determine if a column is only null
* @param statistics column statistics
* @param id Parquet file name
*
* @return Optional.empty() if statistics were sufficient to eliminate the file section.
* Otherwise, a list of columns for which page-level indices and dictionary could be consulted
* to potentially eliminate the file section. An optional with empty list is returned if there is
* going to be no benefit in looking at column index or dictionary for any column.
*/
public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(
Map<ColumnDescriptor, Long> valueCounts,
Map<ColumnDescriptor, Statistics<?>> statistics,
ParquetDataSourceId id)
throws ParquetCorruptionException
{
if (numberOfRows == 0) {
return Optional.empty();
}
if (effectivePredicate.isNone()) {
return Optional.empty();
}
Expand All @@ -118,10 +131,14 @@ public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRo
continue;
}

Long columnValueCount = valueCounts.get(column);
if (columnValueCount == null) {
throw new IllegalArgumentException(format("Missing columnValueCount for column %s in %s", column, id));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this come up in the case where you add a column to a table and then insert new data? The old data files would not have the new column in the stats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case we reach here for that scenario, I assume that the above columnStatistics == null check will help to bail out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, yep didn't see that. At least in Iceberg it is possible to configure collection of value counts but not min/max stats,. I guess in that case we'd still ignore them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, without min/max stats but with nulls count and value count, the most we can do is prune a only-null row group for IS NOT NULL predicate and prune a non-nullable row group for IS NULL predicate.
we could consider doing that if that's a practically useful thing.

}
Domain domain = getDomain(
column,
effectivePredicateDomain.getType(),
numberOfRows,
columnValueCount,
columnStatistics,
id,
timeZone);
Expand All @@ -137,7 +154,13 @@ public Optional<List<ColumnDescriptor>> getIndexLookupCandidates(long numberOfRo
return Optional.of(candidateColumns.build());
}

@Override
/**
* Should the Parquet Reader process a file section with the specified dictionary based on that
* single dictionary. This is safe to check repeatedly to avoid loading more parquet dictionaries
* if the section can already be eliminated.
*
* @param dictionary The single column dictionary
*/
public boolean matches(DictionaryDescriptor dictionary)
{
requireNonNull(dictionary, "dictionary is null");
Expand All @@ -152,16 +175,18 @@ public boolean matches(DictionaryDescriptor dictionary)
return effectivePredicateDomain == null || effectivePredicateMatches(effectivePredicateDomain, dictionary);
}

@Override
public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, ParquetDataSourceId id)
/**
* Should the Parquet Reader process a file section with the specified statistics.
*
* @param valueCounts the number of values for a column in the segment; this can be used with
* Statistics to determine if a column is only null
* @param columnIndexStore column index (statistics) store
* @param id Parquet file name
*/
public boolean matches(Map<ColumnDescriptor, Long> valueCounts, ColumnIndexStore columnIndexStore, ParquetDataSourceId id)
throws ParquetCorruptionException
{
requireNonNull(columnIndexStore, "columnIndexStore is null");

if (numberOfRows == 0) {
return false;
}

if (effectivePredicate.isNone()) {
return false;
}
Expand All @@ -180,7 +205,11 @@ public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, Par
continue;
}

Domain domain = getDomain(effectivePredicateDomain.getType(), numberOfRows, columnIndex, id, column, timeZone);
Long columnValueCount = valueCounts.get(column);
if (columnValueCount == null) {
throw new IllegalArgumentException(format("Missing columnValueCount for column %s in %s", column, id));
}
Domain domain = getDomain(effectivePredicateDomain.getType(), columnValueCount, columnIndex, id, column, timeZone);
if (!effectivePredicateDomain.overlaps(domain)) {
return false;
}
Expand All @@ -189,7 +218,12 @@ public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, Par
return true;
}

@Override
/**
* Convert Predicate to Parquet filter if possible.
*
* @param timeZone current Parquet timezone
* @return Converted Parquet filter or null if conversion not possible
*/
public Optional<FilterPredicate> toParquetFilter(DateTimeZone timeZone)
{
return Optional.ofNullable(convertToParquetFilter(timeZone));
Expand All @@ -204,7 +238,7 @@ private boolean effectivePredicateMatches(Domain effectivePredicateDomain, Dicti
public static Domain getDomain(
ColumnDescriptor column,
Type type,
long rowCount,
long columnValuesCount,
Statistics<?> statistics,
ParquetDataSourceId id,
DateTimeZone timeZone)
Expand All @@ -214,7 +248,7 @@ public static Domain getDomain(
return Domain.all(type);
}

if (statistics.isNumNullsSet() && statistics.getNumNulls() == rowCount) {
if (statistics.isNumNullsSet() && statistics.getNumNulls() == columnValuesCount) {
return Domain.onlyNull(type);
}

Expand Down Expand Up @@ -406,7 +440,7 @@ private static Domain getDomain(
@VisibleForTesting
public static Domain getDomain(
Type type,
long rowCount,
long columnValuesCount,
ColumnIndex columnIndex,
ParquetDataSourceId id,
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -435,7 +469,7 @@ public static Domain getDomain(
.sum();
boolean hasNullValue = totalNullCount > 0;

if (hasNullValue && totalNullCount == rowCount) {
if (hasNullValue && totalNullCount == columnValuesCount) {
return Domain.onlyNull(type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetWriteValidation;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
import io.trino.parquet.reader.FilteredOffsetIndex.OffsetRange;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.spi.Page;
Expand Down Expand Up @@ -162,7 +162,7 @@ public ParquetReader(
AggregatedMemoryContext memoryContext,
ParquetReaderOptions options,
Function<Exception, RuntimeException> exceptionTransform,
Optional<Predicate> parquetPredicate,
Optional<TupleDomainParquetPredicate> parquetPredicate,
List<Optional<ColumnIndexStore>> columnIndexStore,
Optional<ParquetWriteValidation> writeValidation)
throws IOException
Expand Down
Loading