Skip to content

Commit

Permalink
Multiply on disk column sizes of iceberg files by 4 for column stats
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Dec 7, 2022
1 parent 4b26670 commit bc1c0a4
Show file tree
Hide file tree
Showing 26 changed files with 592 additions and 511 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ColumnStatistics;
import io.trino.spi.statistics.DoubleRange;
import io.trino.spi.statistics.Estimate;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.io.IOException;
Expand All @@ -40,6 +43,8 @@
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toUnmodifiableMap;

Expand Down Expand Up @@ -88,6 +93,9 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle)
List<IcebergColumnHandle> columnHandles = getColumns(icebergTableSchema, typeManager);
Map<Integer, IcebergColumnHandle> idToColumnHandle = columnHandles.stream()
.collect(toUnmodifiableMap(IcebergColumnHandle::getId, identity()));
Map<Integer, org.apache.iceberg.types.Type> idToType = columns.stream()
.map(column -> Maps.immutableEntry(column.fieldId(), column.type()))
.collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(enforcedPredicate))
Expand All @@ -114,17 +122,42 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle)

ImmutableMap.Builder<ColumnHandle, ColumnStatistics> columnHandleBuilder = ImmutableMap.builder();
double recordCount = summary.getRecordCount();
for (IcebergColumnHandle columnHandle : idToColumnHandle.values()) {
for (Map.Entry<Integer, IcebergColumnHandle> columnHandleTuple : idToColumnHandle.entrySet()) {
IcebergColumnHandle columnHandle = columnHandleTuple.getValue();
int fieldId = columnHandle.getId();
ColumnStatistics.Builder columnBuilder = new ColumnStatistics.Builder();
Long nullCount = summary.getNullCounts().get(fieldId);
if (nullCount != null) {
columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount));
}
if (summary.getColumnSizes() != null) {
if (idToType.get(columnHandleTuple.getKey()).typeId() == Type.TypeID.FIXED) {
Types.FixedType fixedType = (Types.FixedType) idToType.get(columnHandleTuple.getKey());
long columnSize = fixedType.length();
columnBuilder.setDataSize(Estimate.of(columnSize));
}
else if (summary.getColumnSizes() != null) {
Long columnSize = summary.getColumnSizes().get(fieldId);
if (columnSize != null) {
columnBuilder.setDataSize(Estimate.of(columnSize));
// columnSize is the size on disk and Trino column stats is size in memory.
// The relation between the two is type and data dependent.
// However, Trino currently does not use data size statistics for fixed-width types
// (it's not needed for them), so do not report it at all, to avoid reporting some bogus value.
if (!(columnHandle.getBaseType() instanceof FixedWidthType)) {
if (columnHandle.getBaseType() == VARCHAR) {
// Tested using item table from TPCDS benchmark
// compared column size of item_desc column stored inside files
// with length of values in that column reported by trino
columnSize = (long) (columnSize * 2.7);
}
else if (columnHandle.getBaseType() == VARBINARY) {
// Tested using VARBINARY columns with random, both in length and content, data
// compared column size stored inside parquet files with length of bytes written to it
// Data used for testing came from alpha numeric characters so it was not very real life like scenario
// In the future better heuristic could be used here
columnSize = (long) (columnSize * 1.4);
}
columnBuilder.setDataSize(Estimate.of(columnSize));
}
}
}
Object min = summary.getMinValues().get(fieldId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,8 @@ public void testCreatePartitionedTable()
" ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " +
" ('a_varchar', 87e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', 82e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varchar', 234e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', 114e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " +
" ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " +
Expand All @@ -856,7 +856,7 @@ public void testCreatePartitionedTable()
" ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " +
" ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " +
" ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " +
" ('a quoted, field', 83e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a quoted, field', 224e0, NULL, 0.5e0, NULL, NULL, NULL), " +
" (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)");
}
case AVRO -> {
Expand Down Expand Up @@ -2620,7 +2620,7 @@ public void testTruncateTextTransform()
assertThat(query("SHOW STATS FOR test_truncate_text_transform"))
.skippingTypesCheck()
.matches("VALUES " +
" ('d', " + (format == PARQUET ? "205e0" : "NULL") + ", NULL, " + (format == AVRO ? "NULL" : "0.125e0") + ", NULL, NULL, NULL), " +
" ('d', " + (format == PARQUET ? "553e0" : "NULL") + ", NULL, " + (format == AVRO ? "NULL" : "0.125e0") + ", NULL, NULL, NULL), " +
(format == AVRO ? " ('b', NULL, NULL, NULL, NULL, NULL, NULL), " : " ('b', NULL, NULL, 0e0, NULL, '1', '101'), ") +
" (NULL, NULL, NULL, NULL, 8e0, NULL, NULL)");

Expand Down Expand Up @@ -2912,7 +2912,7 @@ public void testApplyFilterWithNonEmptyConstraintPredicate()
}
if (format == PARQUET) {
expected = "VALUES " +
" ('d', 136e0, NULL, 0e0, NULL, NULL, NULL), " +
" ('d', 367e0, NULL, 0e0, NULL, NULL, NULL), " +
" ('b', NULL, NULL, 0e0, NULL, '1', '7'), " +
" (NULL, NULL, NULL, NULL, 7e0, NULL, NULL)";
}
Expand Down Expand Up @@ -2971,7 +2971,7 @@ public void testVoidTransform()
assertThat(query("SHOW STATS FOR test_void_transform"))
.skippingTypesCheck()
.matches("VALUES " +
" ('d', " + (format == PARQUET ? "76e0" : "NULL") + ", NULL, 0.2857142857142857, NULL, NULL, NULL), " +
" ('d', " + (format == PARQUET ? "205e0" : "NULL") + ", NULL, 0.2857142857142857, NULL, NULL, NULL), " +
" ('b', NULL, NULL, 0e0, NULL, '1', '7'), " +
" (NULL, NULL, NULL, NULL, 7e0, NULL, NULL)");
}
Expand Down Expand Up @@ -3136,8 +3136,8 @@ public void testBasicAnalyze()
" (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)")
: ("VALUES " +
" ('regionkey', NULL, NULL, 0e0, NULL, '0', '4'), " +
" ('name', " + (format == PARQUET ? "87e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('comment', " + (format == PARQUET ? "237e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('name', " + (format == PARQUET ? "234e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('comment', " + (format == PARQUET ? "639e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)");

String statsWithNdv = format == AVRO
Expand All @@ -3148,8 +3148,8 @@ public void testBasicAnalyze()
" (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)")
: ("VALUES " +
" ('regionkey', NULL, 5e0, 0e0, NULL, '0', '4'), " +
" ('name', " + (format == PARQUET ? "87e0" : "NULL") + ", 5e0, 0e0, NULL, NULL, NULL), " +
" ('comment', " + (format == PARQUET ? "237e0" : "NULL") + ", 5e0, 0e0, NULL, NULL, NULL), " +
" ('name', " + (format == PARQUET ? "234e0" : "NULL") + ", 5e0, 0e0, NULL, NULL, NULL), " +
" ('comment', " + (format == PARQUET ? "639e0" : "NULL") + ", 5e0, 0e0, NULL, NULL, NULL), " +
" (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)");

// initially, no NDV information
Expand Down Expand Up @@ -3588,8 +3588,8 @@ public void testCreateNestedPartitionedTable()
" ('dbl', NULL, NULL, 0e0, NULL, '1.0', '1.0'), " +
" ('mp', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " +
" ('dec', NULL, NULL, 0e0, NULL, '1.0', '1.0'), " +
" ('vc', " + (format == PARQUET ? "43e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('vb', " + (format == PARQUET ? "55e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('vc', " + (format == PARQUET ? "116e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('vb', " + (format == PARQUET ? "77e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('ts', NULL, NULL, 0e0, NULL, '2021-07-24 02:43:57.348000', " + (format == ORC ? "'2021-07-24 02:43:57.348999'" : "'2021-07-24 02:43:57.348000'") + "), " +
" ('tstz', NULL, NULL, 0e0, NULL, '2021-07-24 02:43:57.348 UTC', '2021-07-24 02:43:57.348 UTC'), " +
" ('str', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " +
Expand Down Expand Up @@ -3651,7 +3651,7 @@ public void testCreateNestedPartitionedTable()
" ('dbl', NULL, NULL, 0e0, NULL, '1.0', '1.0'), " +
" ('mp', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " +
" ('dec', NULL, NULL, 0e0, NULL, '1.0', '1.0'), " +
" ('vc', " + (format == PARQUET ? "43e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('vc', " + (format == PARQUET ? "116e0" : "NULL") + ", NULL, 0e0, NULL, NULL, NULL), " +
" ('str', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " +
" (NULL, NULL, NULL, NULL, 1e0, NULL, NULL)");
}
Expand Down Expand Up @@ -3963,8 +3963,8 @@ public void testAllAvailableTypes()
" ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " +
" ('a_varchar', " + (format == PARQUET ? "87e0" : "NULL") + ", NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', " + (format == PARQUET ? "82e0" : "NULL") + ", NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varchar', " + (format == PARQUET ? "234e0" : "NULL") + ", NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', " + (format == PARQUET ? "114e0" : "NULL") + ", NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " +
" ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " +
" ('a_timestamp', NULL, NULL, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " +
Expand Down Expand Up @@ -4017,8 +4017,8 @@ public void testAllAvailableTypes()
" ('a_double', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_short_decimal', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " +
" ('a_long_decimal', NULL, 1e0, 0.5e0, NULL, '11.0', '11.0'), " +
" ('a_varchar', " + (format == PARQUET ? "87e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', " + (format == PARQUET ? "82e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " +
" ('a_varchar', " + (format == PARQUET ? "234e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " +
" ('a_varbinary', " + (format == PARQUET ? "114e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " +
" ('a_date', NULL, 1e0, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " +
" ('a_time', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " +
" ('a_timestamp', NULL, 1e0, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,15 +1496,25 @@ public void testPartialStats()
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);

onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT)");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2)");
onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT, col2 STRING, col3 BINARY)");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2, 'col2Value0', X'000102f0feff')");
assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName))
.containsOnly(row("col0", null, null, 0.0, null, "1", "1"), row("col1", null, null, 0.0, null, "2", "2"), row(null, null, null, null, 1.0, null, null));
.containsOnly(
row("col0", null, null, 0.0, null, "1", "1"),
row("col1", null, null, 0.0, null, "2", "2"),
row("col2", 151.0, null, 0.0, null, null, null),
row("col3", 72.0, null, 0.0, null, null, null),
row(null, null, null, null, 1.0, null, null));

onSpark().executeQuery("ALTER TABLE " + sparkTableName + " SET TBLPROPERTIES (write.metadata.metrics.column.col1='none')");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4)");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4, 'col2Value1', X'000102f0feee')");
assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName))
.containsOnly(row("col0", null, null, 0.0, null, "1", "3"), row("col1", null, null, null, null, null, null), row(null, null, null, null, 2.0, null, null));
.containsOnly(
row("col0", null, null, 0.0, null, "1", "3"),
row("col1", null, null, null, null, null, null),
row("col2", 305.0, null, 0.0, null, null, null),
row("col3", 145.0, null, 0.0, null, null, null),
row(null, null, null, null, 2.0, null, null));

onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ local exchange (GATHER, SINGLE, [])
scan customer_address
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["c_current_addr_sk"])
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk"])
local exchange (REPARTITION, ROUND_ROBIN, [])
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["ss_customer_sk"])
join (INNER, REPLICATED):
join (INNER, PARTITIONED):
remote exchange (REPARTITION, HASH, ["c_customer_sk"])
local exchange (REPARTITION, ROUND_ROBIN, [])
join (INNER, REPLICATED):
scan customer
local exchange (GATHER, SINGLE, [])
remote exchange (REPARTITION, HASH, ["ss_customer_sk"])
local exchange (REPARTITION, ROUND_ROBIN, [])
join (INNER, REPLICATED):
join (INNER, REPLICATED):
scan store_sales
Expand All @@ -27,6 +27,6 @@ local exchange (GATHER, SINGLE, [])
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan date_dim
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan store
local exchange (GATHER, SINGLE, [])
remote exchange (REPLICATE, BROADCAST, [])
scan store
Loading

0 comments on commit bc1c0a4

Please sign in to comment.