Skip to content

Commit

Permalink
Support data_size when analyzing in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 15, 2022
1 parent 4dbf41f commit c440595
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 60 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ Table statistics
^^^^^^^^^^^^^^^^

You can use :doc:`/sql/analyze` statements in Trino to populate the table
statistics in Delta Lake. Number of distinct values (NDV)
statistics in Delta Lake. Data size and number of distinct values (NDV)
statistics are supported, while Minimum value, maximum value, and null value
count statistics are not supported. The :doc:`cost-based optimizer
</optimizer/cost-based-optimizations>` then uses these statistics to improve
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -59,6 +60,7 @@
import io.trino.plugin.hive.security.AccessControlMetadata;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogSchemaName;
Expand Down Expand Up @@ -95,11 +97,13 @@
import io.trino.spi.security.RoleGrant;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ColumnStatisticMetadata;
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.statistics.TableStatisticsMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
Expand Down Expand Up @@ -128,6 +132,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand Down Expand Up @@ -195,6 +200,7 @@
import static io.trino.spi.predicate.ValueSet.ofRanges;
import static io.trino.spi.statistics.ColumnStatisticType.MAX_VALUE;
import static io.trino.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES_SUMMARY;
import static io.trino.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
Expand Down Expand Up @@ -242,6 +248,10 @@ public class DeltaLakeMetadata
// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty()));
private static final Set<ColumnStatisticType> SUPPORTED_STATISTICS_TYPE = ImmutableSet.<ColumnStatisticType>builder()
.add(TOTAL_SIZE_IN_BYTES)
.add(NUMBER_OF_DISTINCT_VALUES_SUMMARY)
.build();

private final DeltaLakeMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -1932,8 +1942,14 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
analyzeColumnNames
.map(columnNames -> columnNames.contains(columnMetadata.getName()))
.orElse(true))
.map(columnMetadata -> new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY))
.forEach(columnStatistics::add);
.forEach(columnMetadata -> {
if (!(columnMetadata.getType() instanceof FixedWidthType)) {
if (statistics.isEmpty() || totalSizeStatisticsExists(statistics.get().getColumnStatistics(), columnMetadata.getName())) {
columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), TOTAL_SIZE_IN_BYTES));
}
}
columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY));
});

// collect max(file modification time) for sake of incremental ANALYZE
columnStatistics.add(new ColumnStatisticMetadata(FILE_MODIFIED_TIME_COLUMN_NAME, MAX_VALUE));
Expand All @@ -1958,6 +1974,11 @@ private static boolean shouldCollectExtendedStatistics(ColumnMetadata columnMeta
return true;
}

private static boolean totalSizeStatisticsExists(Map<String, DeltaLakeColumnStatistics> statistics, String columnName)
{
return statistics.containsKey(columnName) && statistics.get(columnName).getTotalSizeInBytes().isPresent();
}

@Override
public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -2130,26 +2151,53 @@ private static Map<String, DeltaLakeColumnStatistics> toDeltaLakeColumnStatistic
{
// Only statistics for whole table are collected
ComputedStatistics singleStatistics = Iterables.getOnlyElement(computedStatistics);
return createColumnToComputedStatisticsMap(singleStatistics.getColumnStatistics()).entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> createDeltaLakeColumnStatistics(entry.getValue())));
}

return singleStatistics.getColumnStatistics().entrySet().stream()
.filter(not(entry -> entry.getKey().getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)))
.collect(toImmutableMap(
entry -> entry.getKey().getColumnName(),
entry -> {
ColumnStatisticMetadata columnStatisticMetadata = entry.getKey();
if (columnStatisticMetadata.getStatisticType() != NUMBER_OF_DISTINCT_VALUES_SUMMARY) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
"Unexpected statistics type " + columnStatisticMetadata.getStatisticType() + " found for column " + columnStatisticMetadata.getColumnName());
}
if (entry.getValue().isNull(0)) {
return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(4096)); // empty HLL with number of buckets used by $approx_set
}
else {
Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, entry.getValue());
return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(serializedSummary));
}
}));
private static Map<String, Map<ColumnStatisticType, Block>> createColumnToComputedStatisticsMap(Map<ColumnStatisticMetadata, Block> computedStatistics)
{
ImmutableTable.Builder<String, ColumnStatisticType, Block> result = ImmutableTable.builder();
computedStatistics.forEach((metadata, block) -> {
if (metadata.getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)) {
return;
}
if (!SUPPORTED_STATISTICS_TYPE.contains(metadata.getStatisticType())) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
"Unexpected statistics type " + metadata.getStatisticType() + " found for column " + metadata.getColumnName());
}

result.put(metadata.getColumnName(), metadata.getStatisticType(), block);
});
return result.buildOrThrow().rowMap();
}

private static DeltaLakeColumnStatistics createDeltaLakeColumnStatistics(Map<ColumnStatisticType, Block> computedStatistics)
{
OptionalLong totalSize = OptionalLong.empty();
if (computedStatistics.containsKey(TOTAL_SIZE_IN_BYTES)) {
totalSize = getLongValue(computedStatistics.get(TOTAL_SIZE_IN_BYTES));
}
HyperLogLog ndvSummary = getHyperLogLogForNdv(computedStatistics.get(NUMBER_OF_DISTINCT_VALUES_SUMMARY));
return DeltaLakeColumnStatistics.create(totalSize, ndvSummary);
}

private static OptionalLong getLongValue(Block block)
{
if (block.isNull(0)) {
return OptionalLong.of(0);
}
return OptionalLong.of(BIGINT.getLong(block, 0));
}

private static HyperLogLog getHyperLogLogForNdv(Block block)
{
if (block.isNull(0)) {
return HyperLogLog.newInstance(4096); // number of buckets used by $approx_set
}
Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, block);
return HyperLogLog.newInstance(serializedSummary);
}

private static Optional<Instant> getMaxFileModificationTime(Collection<ComputedStatistics> computedStatistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ else if (isValidInRange(minValue)) {
if (statistics.isPresent()) {
DeltaLakeColumnStatistics deltaLakeColumnStatistics = statistics.get().getColumnStatistics().get(column.getName());
if (deltaLakeColumnStatistics != null && column.getColumnType() != PARTITION_KEY) {
deltaLakeColumnStatistics.getTotalSizeInBytes().ifPresent(size -> columnStatsBuilder.setDataSize(Estimate.of(size)));
columnStatsBuilder.setDistinctValuesCount(Estimate.of(deltaLakeColumnStatistics.getNdvSummary().cardinality()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,43 @@
import io.airlift.stats.cardinality.HyperLogLog;

import java.util.Base64;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

public class DeltaLakeColumnStatistics
{
private final OptionalLong totalSizeInBytes;
private final HyperLogLog ndvSummary;

@JsonCreator
public static DeltaLakeColumnStatistics create(
@JsonProperty("totalSizeInBytes") OptionalLong totalSizeInBytes,
@JsonProperty("ndvSummary") String ndvSummaryBase64)
{
requireNonNull(totalSizeInBytes, "totalSizeInBytes is null");
requireNonNull(ndvSummaryBase64, "ndvSummaryBase64 is null");
byte[] ndvSummaryBytes = Base64.getDecoder().decode(ndvSummaryBase64);
return new DeltaLakeColumnStatistics(HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes)));
return new DeltaLakeColumnStatistics(totalSizeInBytes, HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes)));
}

public static DeltaLakeColumnStatistics create(HyperLogLog ndvSummary)
public static DeltaLakeColumnStatistics create(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary)
{
return new DeltaLakeColumnStatistics(ndvSummary);
return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary);
}

private DeltaLakeColumnStatistics(HyperLogLog ndvSummary)
private DeltaLakeColumnStatistics(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary)
{
this.totalSizeInBytes = requireNonNull(totalSizeInBytes, "totalSizeInBytes is null");
this.ndvSummary = requireNonNull(ndvSummary, "ndvSummary is null");
}

@JsonProperty
public OptionalLong getTotalSizeInBytes()
{
return totalSizeInBytes;
}

@JsonProperty("ndvSummary")
public String getNdvSummaryBase64()
{
Expand All @@ -58,8 +69,17 @@ public HyperLogLog getNdvSummary()

public DeltaLakeColumnStatistics update(DeltaLakeColumnStatistics newStatistics)
{
OptionalLong totalSizeInBytes = mergeIntegerStatistics(this.totalSizeInBytes, newStatistics.totalSizeInBytes);
HyperLogLog ndvSummary = HyperLogLog.newInstance(this.ndvSummary.serialize());
ndvSummary.mergeWith(newStatistics.ndvSummary);
return new DeltaLakeColumnStatistics(ndvSummary);
return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary);
}

private static OptionalLong mergeIntegerStatistics(OptionalLong first, OptionalLong second)
{
if (first.isPresent() && second.isPresent()) {
return OptionalLong.of(first.getAsLong() + second.getAsLong());
}
return OptionalLong.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,8 +1155,8 @@ public void testAnalyze()
"VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', null, 25.0, 0.0, null, null, null)," +
"('name', null, 25.0, 0.0, null, null, null)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)");
}

Expand Down
Loading

0 comments on commit c440595

Please sign in to comment.