Skip to content

Commit

Permalink
Support converting column stats on row type to json in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 4, 2022
1 parent 09bc0a9 commit 497e650
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.base.type.DecodedTimestamp;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.type.ArrayType;
Expand Down Expand Up @@ -67,6 +69,7 @@
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
Expand Down Expand Up @@ -154,8 +157,7 @@ public static Map<String, Object> toJsonValues(Map<String, Type> columnTypeMappi
Map<String, Object> jsonValues = new HashMap<>();
for (Map.Entry<String, Object> value : values.entrySet()) {
Type type = columnTypeMapping.get(value.getKey());
// TODO: Add support for row type
if (type instanceof ArrayType || type instanceof MapType || type instanceof RowType) {
if (type instanceof ArrayType || type instanceof MapType) {
continue;
}
jsonValues.put(value.getKey(), toJsonValue(columnTypeMapping.get(value.getKey()), value.getValue()));
Expand Down Expand Up @@ -197,6 +199,16 @@ private static Object toJsonValue(Type type, @Nullable Object value)
Instant ts = Instant.ofEpochMilli(unpackMillisUtc((long) value));
return ISO_INSTANT.format(ZonedDateTime.ofInstant(ts, UTC));
}
if (type instanceof RowType rowType) {
Block rowBlock = (Block) value;
Map<String, Object> fieldValues = new HashMap<>(); // Use HashMap because value is nullable
for (int i = 0; i < rowBlock.getPositionCount(); i++) {
RowType.Field field = rowType.getFields().get(i);
Object fieldValue = readNativeValue(field.getType(), rowBlock.getChildren().get(i), i);
fieldValues.put(field.getName().orElseThrow(), toJsonValue(field.getType(), fieldValue));
}
return fieldValues;
}

throw new UnsupportedOperationException("Unsupported type: " + type);
}
Expand All @@ -222,6 +234,37 @@ private static Map<String, Object> jsonEncode(Map<String, Optional<Statistics<?>
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get()));
}

public static Map<String, Object> toNullCounts(Map<String, Type> columnTypeMapping, Map<String, Object> values)
{
ImmutableMap.Builder<String, Object> nullCounts = ImmutableMap.builderWithExpectedSize(values.size());
for (Map.Entry<String, Object> value : values.entrySet()) {
Type type = columnTypeMapping.get(value.getKey());
nullCounts.put(value.getKey(), toNullCount(type, value.getValue()));
}
return nullCounts.buildOrThrow();
}

private static Object toNullCount(Type type, Object value)
{
if (type instanceof RowType rowType) {
Block rowBlock = (Block) value;
ImmutableMap.Builder<String, Object> nullCounts = ImmutableMap.builderWithExpectedSize(rowBlock.getPositionCount());
for (int i = 0; i < rowBlock.getPositionCount(); i++) {
RowType.Field field = rowType.getFields().get(i);
Object nullCount;
if (field.getType() instanceof RowType) {
nullCount = rowBlock.getChildren().get(i).getSingleValueBlock(i);
}
else {
nullCount = rowBlock.getChildren().get(i).getLong(i, 0);
}
nullCounts.put(field.getName().orElseThrow(), toNullCount(field.getType(), nullCount));
}
return nullCounts.buildOrThrow();
}
return value;
}

private static Optional<Object> getMin(Type type, Statistics<?> statistics)
{
if (statistics.genericGetMin() == null || !statistics.hasNonNullValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSchemaProperties.buildHiveSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toJsonValues;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toNullCounts;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY;
Expand Down Expand Up @@ -244,7 +245,7 @@ private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, A
parquetFileStatistics.getNumRecords(),
parquetFileStatistics.getMinValues().map(values -> toJsonValues(columnTypeMapping, values)),
parquetFileStatistics.getMaxValues().map(values -> toJsonValues(columnTypeMapping, values)),
parquetFileStatistics.getNullCount());
parquetFileStatistics.getNullCount().map(nullCounts -> toNullCounts(columnTypeMapping, nullCounts)));
statsJson = getStatsString(jsonFileStatistics).orElse(null);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import io.trino.Session;
import io.trino.execution.QueryManager;
import io.trino.operator.OperatorStats;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.spi.QueryId;
import io.trino.spi.connector.SchemaTableName;
import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.DistributedQueryRunner;
Expand All @@ -40,11 +42,13 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.Sets.union;
Expand All @@ -65,6 +69,7 @@
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -93,6 +98,7 @@ public abstract class BaseDeltaLakeConnectorSmokeTest
"old_timestamps",
"nested_timestamps",
"nested_timestamps_parquet_stats",
"json_stats_on_row_type",
"parquet_stats_missing",
"uppercase_columns",
"default_partitions",
Expand Down Expand Up @@ -831,6 +837,31 @@ public void testSelectNestedTimestamps()
assertQuery("SELECT CAST(col1[1].ts AS VARCHAR) FROM nested_timestamps_parquet_stats LIMIT 1", "VALUES '2010-02-03 12:11:10.000 UTC'");
}

@Test
public void testJsonStats()
throws Exception
{
verifySupportsInsert();

assertQuery("SELECT count(*) FROM json_stats_on_row_type", "VALUES 2");
assertThat(getAddFileEntries("json_stats_on_row_type")).hasSize(2);

assertUpdate("INSERT INTO json_stats_on_row_type SELECT CAST(row(3) AS row(x bigint))", 1);

// The first two entries created by Databricks have column stats. The last one doesn't have column stats because the connector doesn't support collecting it on row columns.
List<AddFileEntry> addFileEntries = getAddFileEntries("json_stats_on_row_type").stream().sorted(comparing(AddFileEntry::getModificationTime)).collect(toImmutableList());
assertThat(addFileEntries).hasSize(3);
assertThat(addFileEntries.get(0).getStatsString().orElseThrow()).isEqualTo("{\"numRecords\":1,\"minValues\":{\"col\":{\"x\":1}},\"maxValues\":{\"col\":{\"x\":1}},\"nullCount\":{\"col\":{\"x\":0}}}");
assertThat(addFileEntries.get(1).getStatsString().orElseThrow()).isEqualTo("{\"numRecords\":1,\"minValues\":{\"col\":{\"x\":2}},\"maxValues\":{\"col\":{\"x\":2}},\"nullCount\":{\"col\":{\"x\":0}}}");
assertThat(addFileEntries.get(2).getStatsString().orElseThrow()).isEqualTo("{\"numRecords\":1,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}");
}

protected List<AddFileEntry> getAddFileEntries(String tableName)
throws IOException
{
return TestDeltaLakeUtils.getAddFileEntries(new SchemaTableName(SCHEMA, tableName), getLocationForTable(bucketName, tableName));
}

@Test
public void testMissingParquetStats()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Data generated using Databricks 10.4:

```sql
CREATE TABLE default.json_stats_on_row_type
(col struct<x bigint>)
USING DELTA
LOCATION 's3://bucket/databricks-compatibility-test-json_stats_on_row_type'
TBLPROPERTIES (
delta.checkpointInterval = 2,
delta.checkpoint.writeStatsAsJson = false,
delta.checkpoint.writeStatsAsStruct = true
);

INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', 1);
INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', 2);

ALTER TABLE default.json_stats_on_row_type SET TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = true,
'delta.checkpoint.writeStatsAsStruct' = false
);
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"4d31d39a-9c41-480c-8ccf-f30d4eb8816a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false","delta.checkpointInterval":"2"},"createdTime":1664853124645}}
{"commitInfo":{"timestamp":1664853124758,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsStruct\":\"true\",\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"eeafc8c1-d58a-4d19-998c-2b89a1cf8295"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-6d366afc-74c5-4326-aea0-08cda4c18817-c000.snappy.parquet","partitionValues":{},"size":694,"modificationTime":1664853129000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":{\"x\":1}},\"maxValues\":{\"col\":{\"x\":1}},\"nullCount\":{\"col\":{\"x\":0}}}","tags":{"INSERTION_TIME":"1664853129000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1664853128886,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"694"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"d3bc34f8-eed4-4f44-aa13-580e68d2b210"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-535f04e2-d8a8-4917-a669-3122fb91825c-c000.snappy.parquet","partitionValues":{},"size":694,"modificationTime":1664853131000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":{\"x\":2}},\"maxValues\":{\"col\":{\"x\":2}},\"nullCount\":{\"col\":{\"x\":0}}}","tags":{"INSERTION_TIME":"1664853131000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1664853130696,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"694"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"7ca315f6-ee47-4528-b169-23ab05c7db75"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"metaData":{"id":"4d31d39a-9c41-480c-8ccf-f30d4eb8816a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"false","delta.checkpoint.writeStatsAsJson":"true","delta.checkpointInterval":"2"},"createdTime":1664853124645}}
{"commitInfo":{"timestamp":1664853132896,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.checkpoint.writeStatsAsJson\":\"true\",\"delta.checkpoint.writeStatsAsStruct\":\"false\"}"},"readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"5f53a6c9-6aad-4ea1-8b5d-f38c1a222ded"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":2,"size":4}
Binary file not shown.
Binary file not shown.

0 comments on commit 497e650

Please sign in to comment.