Skip to content

Commit

Permalink
Support converting column stats on row type to json in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 11, 2022
1 parent b92481d commit bcbbc9f
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.base.type.DecodedTimestamp;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DateType;
Expand Down Expand Up @@ -56,6 +59,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp;
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
Expand All @@ -67,6 +71,7 @@
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
Expand All @@ -76,6 +81,7 @@
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Objects.requireNonNull;

public class DeltaLakeParquetStatisticsUtils
{
Expand Down Expand Up @@ -154,8 +160,7 @@ public static Map<String, Object> toJsonValues(Map<String, Type> columnTypeMappi
Map<String, Object> jsonValues = new HashMap<>();
for (Map.Entry<String, Object> value : values.entrySet()) {
Type type = columnTypeMapping.get(value.getKey());
// TODO: Add support for row type
if (type instanceof ArrayType || type instanceof MapType || type instanceof RowType) {
if (type instanceof ArrayType || type instanceof MapType) {
continue;
}
jsonValues.put(value.getKey(), toJsonValue(columnTypeMapping.get(value.getKey()), value.getValue()));
Expand Down Expand Up @@ -197,6 +202,19 @@ private static Object toJsonValue(Type type, @Nullable Object value)
Instant ts = Instant.ofEpochMilli(unpackMillisUtc((long) value));
return ISO_INSTANT.format(ZonedDateTime.ofInstant(ts, UTC));
}
if (type instanceof RowType rowType) {
Block rowBlock = (Block) value;
ImmutableMap.Builder<String, Object> fieldValues = ImmutableMap.builder();
for (int i = 0; i < rowBlock.getPositionCount(); i++) {
RowType.Field field = rowType.getFields().get(i);
Object fieldValue = readNativeValue(field.getType(), rowBlock.getChildren().get(i), i);
Object jsonValue = toJsonValue(field.getType(), fieldValue);
if (jsonValue != null) {
fieldValues.put(field.getName().orElseThrow(), jsonValue);
}
}
return fieldValues.buildOrThrow();
}

throw new UnsupportedOperationException("Unsupported type: " + type);
}
Expand All @@ -222,6 +240,36 @@ private static Map<String, Object> jsonEncode(Map<String, Optional<Statistics<?>
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().get()));
}

public static Map<String, Object> toNullCounts(Map<String, Type> columnTypeMapping, Map<String, Object> values)
{
ImmutableMap.Builder<String, Object> nullCounts = ImmutableMap.builderWithExpectedSize(values.size());
for (Map.Entry<String, Object> value : values.entrySet()) {
Type type = columnTypeMapping.get(value.getKey());
requireNonNull(type, "type is null");
nullCounts.put(value.getKey(), toNullCount(type, value.getValue()));
}
return nullCounts.buildOrThrow();
}

private static Object toNullCount(Type type, Object value)
{
if (type instanceof RowType rowType) {
ColumnarRow row = toColumnarRow((Block) value);
ImmutableMap.Builder<String, Object> nullCounts = ImmutableMap.builderWithExpectedSize(row.getPositionCount());
for (int i = 0; i < row.getPositionCount(); i++) {
RowType.Field field = rowType.getFields().get(i);
if (field.getType() instanceof RowType) {
nullCounts.put(field.getName().orElseThrow(), toNullCount(field.getType(), row.getField(i)));
}
else {
nullCounts.put(field.getName().orElseThrow(), BIGINT.getLong(row.getField(i), 0));
}
}
return nullCounts.buildOrThrow();
}
return value;
}

private static Optional<Object> getMin(Type type, Statistics<?> statistics)
{
if (statistics.genericGetMin() == null || !statistics.hasNonNullValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSchemaProperties.buildHiveSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.jsonValueToTrinoValue;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toJsonValues;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeParquetStatisticsUtils.toNullCounts;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY;
Expand Down Expand Up @@ -244,7 +245,7 @@ private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, A
parquetFileStatistics.getNumRecords(),
parquetFileStatistics.getMinValues().map(values -> toJsonValues(columnTypeMapping, values)),
parquetFileStatistics.getMaxValues().map(values -> toJsonValues(columnTypeMapping, values)),
parquetFileStatistics.getNullCount());
parquetFileStatistics.getNullCount().map(nullCounts -> toNullCounts(columnTypeMapping, nullCounts)));
statsJson = getStatsString(jsonFileStatistics).orElse(null);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.Session;
import io.trino.execution.QueryManager;
import io.trino.operator.OperatorStats;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
Expand All @@ -40,11 +41,13 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.Sets.union;
Expand All @@ -65,6 +68,7 @@
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -93,6 +97,7 @@ public abstract class BaseDeltaLakeConnectorSmokeTest
"old_timestamps",
"nested_timestamps",
"nested_timestamps_parquet_stats",
"json_stats_on_row_type",
"parquet_stats_missing",
"uppercase_columns",
"default_partitions",
Expand Down Expand Up @@ -831,6 +836,59 @@ public void testSelectNestedTimestamps()
assertQuery("SELECT CAST(col1[1].ts AS VARCHAR) FROM nested_timestamps_parquet_stats LIMIT 1", "VALUES '2010-02-03 12:11:10.000 UTC'");
}

@Test
public void testConvertJsonStatisticsToParquetOnRowType()
throws Exception
{
verifySupportsInsert();

assertQuery("SELECT count(*) FROM json_stats_on_row_type", "VALUES 2");
String transactionLogDirectory = "json_stats_on_row_type/_delta_log";
String newTransactionFile = getLocationForTable(bucketName, "json_stats_on_row_type") + "/_delta_log/00000000000000000004.json";
String newCheckpointFile = getLocationForTable(bucketName, "json_stats_on_row_type") + "/_delta_log/00000000000000000004.checkpoint.parquet";
assertThat(getTableFiles(transactionLogDirectory))
.doesNotContain(newTransactionFile, newCheckpointFile);

assertUpdate("INSERT INTO json_stats_on_row_type SELECT CAST(row(3) AS row(x bigint)), CAST(row(row('test insert')) AS row(y row(nested varchar)))", 1);
assertThat(getTableFiles(transactionLogDirectory))
.contains(newTransactionFile, newCheckpointFile);
assertThat(getAddFileEntries("json_stats_on_row_type")).hasSize(3);

// The first two entries created by Databricks have column stats. The last one doesn't have column stats because the connector doesn't support collecting it on row columns.
List<AddFileEntry> addFileEntries = getAddFileEntries("json_stats_on_row_type").stream().sorted(comparing(AddFileEntry::getModificationTime)).collect(toImmutableList());
assertThat(addFileEntries).hasSize(3);
assertJsonStatistics(
addFileEntries.get(0),
"{" +
"\"numRecords\":1," +
"\"minValues\":{\"nested_struct_col\":{\"y\":{\"nested\":\"test\"}},\"struct_col\":{\"x\":1}}," +
"\"maxValues\":{\"nested_struct_col\":{\"y\":{\"nested\":\"test\"}},\"struct_col\":{\"x\":1}}," +
"\"nullCount\":{\"struct_col\":{\"x\":0},\"nested_struct_col\":{\"y\":{\"nested\":0}}}" +
"}");
assertJsonStatistics(
addFileEntries.get(1),
"{" +
"\"numRecords\":1," +
"\"minValues\":{\"nested_struct_col\":{\"y\":{}},\"struct_col\":{}}," +
"\"maxValues\":{\"nested_struct_col\":{\"y\":{}},\"struct_col\":{}}," +
"\"nullCount\":{\"struct_col\":{\"x\":1},\"nested_struct_col\":{\"y\":{\"nested\":1}}}" +
"}");
assertJsonStatistics(
addFileEntries.get(2),
"{\"numRecords\":1,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}");
}

private List<AddFileEntry> getAddFileEntries(String tableName)
throws IOException
{
return TestingDeltaLakeUtils.getAddFileEntries(getLocationForTable(bucketName, tableName));
}

private void assertJsonStatistics(AddFileEntry addFileEntry, @Language("JSON") String jsonStatistics)
{
assertEquals(addFileEntry.getStatsString().orElseThrow(), jsonStatistics);
}

@Test
public void testMissingParquetStats()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Data generated using Databricks 10.4:

```sql
CREATE TABLE default.json_stats_on_row_type
(struct_col struct<x bigint>, nested_struct_col struct<y struct<nested string>>)
USING DELTA
LOCATION 's3://bucket/table'
TBLPROPERTIES (
delta.checkpointInterval = 2,
delta.checkpoint.writeStatsAsJson = false,
delta.checkpoint.writeStatsAsStruct = true
);

INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', 1), named_struct('y', named_struct('nested', 'test'));
INSERT INTO default.json_stats_on_row_type SELECT named_struct('x', NULL), named_struct('y', named_struct('nested', NULL));

ALTER TABLE default.json_stats_on_row_type SET TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = true,
'delta.checkpoint.writeStatsAsStruct' = false
);
```
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00000-df481541-fe59-4af2-a37f-68a39a1e2a5d-c000.snappy.parquet","partitionValues":{},"size":1074,"modificationTime":1664924826000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"struct_col\":{},\"nested_struct_col\":{\"y\":{}}},\"maxValues\":{\"struct_col\":{},\"nested_struct_col\":{\"y\":{}}},\"nullCount\":{\"struct_col\":{\"x\":1},\"nested_struct_col\":{\"y\":{\"nested\":1}}}}","tags":{"INSERTION_TIME":"1664924826000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1664924826032,"userId":"7853186923043731","userName":"yuya.ebihara@starburstdata.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"2299734316069194"},"clusterId":"0620-043712-o6vqr39c","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"1074"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"974a8474-26b8-42fc-a2c6-4d6af29109a2"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"metaData":{"id":"0e741658-b990-49a7-a25d-d148943b2f44","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_col\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_col\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"y\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"false","delta.checkpoint.writeStatsAsJson":"true","delta.checkpointInterval":"2"},"createdTime":1664924821035}}
{"commitInfo":{"timestamp":1664924827940,"userId":"7853186923043731","userName":"yuya.ebihara@starburstdata.com","operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.checkpoint.writeStatsAsJson\":\"true\",\"delta.checkpoint.writeStatsAsStruct\":\"false\"}"},"notebook":{"notebookId":"2299734316069194"},"clusterId":"0620-043712-o6vqr39c","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"f37d7c4a-027d-4bf6-9bf3-0f26b919018c"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":2,"size":4}
Binary file not shown.
Binary file not shown.

0 comments on commit bcbbc9f

Please sign in to comment.