Skip to content

Commit

Permalink
Core: Include record_count with stats in ManifestReader (#1820)
Browse files Browse the repository at this point in the history
  • Loading branch information
yyanyy authored Feb 3, 2021
1 parent feb2dd8 commit 97703fb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 29 deletions.
4 changes: 1 addition & 3 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand All @@ -39,7 +38,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;

Expand Down Expand Up @@ -166,7 +164,7 @@ public CloseableIterable<FileScanTask> planFiles() {

boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
if (!deleteFiles.isEmpty()) {
select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList()));
select(ManifestReader.withStatsColumns(columns));
}

Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
Expand All @@ -52,8 +53,9 @@
public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup implements CloseableIterable<F> {
static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
static final Set<String> STATS_COLUMNS = Sets.newHashSet(
"value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds");

private static final Set<String> STATS_COLUMNS = ImmutableSet.of(
"value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds", "record_count");

protected enum FileType {
DATA_FILES(GenericDataFile.class.getName()),
Expand Down Expand Up @@ -282,16 +284,20 @@ private static boolean requireStatsProjection(Expression rowFilter, Collection<S

static boolean dropStats(Expression rowFilter, Collection<String> columns) {
// Make sure we only drop all stats if we had projected all stats
// We do not drop stats even if we had partially added some stats columns
return rowFilter != Expressions.alwaysTrue() &&
columns != null &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
// We do not drop stats even if we had partially added some stats columns, except for record_count column.
// Since we don't want to keep stats map which could be huge in size just because we select record_count, which
// is a primitive type.
if (rowFilter != Expressions.alwaysTrue() && columns != null &&
!columns.containsAll(ManifestReader.ALL_COLUMNS)) {
Set<String> intersection = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS);
return intersection.isEmpty() || intersection.equals(Sets.newHashSet("record_count"));
}
return false;
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
static List<String> withStatsColumns(Collection<String> columns) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
return Lists.newArrayList(columns);
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
Expand Down
88 changes: 71 additions & 17 deletions core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Map;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -54,9 +54,9 @@ public TestManifestReaderStats(int formatVersion) {

private static final Metrics METRICS = new Metrics(3L, null,
VALUE_COUNT, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS);

private static final String FILE_PATH = "/path/to/data-a.parquet";
private static final DataFile FILE = DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withPath(FILE_PATH)
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(3)
Expand All @@ -74,7 +74,7 @@ public void testReadIncludesFullStats() throws IOException {
}

@Test
public void testReadWithFilterIncludesFullStats() throws IOException {
public void testReadEntriesWithFilterIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.filterRows(Expressions.equal("id", 3))) {
Expand All @@ -84,11 +84,21 @@ public void testReadWithFilterIncludesFullStats() throws IOException {
}
}

@Test
public void testReadIteratorWithFilterIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertFullStats(entry);
}
}

@Test
public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))
.select(ImmutableList.of("file_path"))
.filterRows(Expressions.equal("id", 3))) {
CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
ManifestEntry<DataFile> entry = entries.iterator().next();
Expand All @@ -100,47 +110,79 @@ public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOExcep
public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))
.select(ImmutableList.of("file_path"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertStatsDropped(entry);
}
}

@Test
public void testReadIteratorWithFilterAndSelectRecordCountDropsStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableList.of("file_path", "record_count"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertStatsDropped(entry);
}
}

@Test
public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count", "value_counts"))
.select(ImmutableList.of("file_path", "value_counts"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertFullStats(entry);

// explicitly call copyWithoutStats and ensure record count will not be dropped
assertStatsDropped(entry.copyWithoutStats());
}
}

@Test
public void testReadEntriesWithSelectNotIncludeFullStats() throws IOException {
public void testReadEntriesWithSelectNotProjectStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))) {
.select(ImmutableList.of("file_path"))) {
CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
ManifestEntry<DataFile> entry = entries.iterator().next();
assertStatsDropped(entry.file());
DataFile dataFile = entry.file();

// selected field is populated
Assert.assertEquals(FILE_PATH, dataFile.path());

// not selected fields are all null and not projected
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.valueCounts());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());
assertNullRecordCount(dataFile);
}
}

@Test
public void testReadEntriesWithSelectCertainStatNotIncludeFullStats() throws IOException {
public void testReadEntriesWithSelectCertainStatNotProjectStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count", "value_counts"))) {
.select(ImmutableList.of("file_path", "value_counts"))) {
DataFile dataFile = reader.iterator().next();

Assert.assertEquals(3, dataFile.recordCount());
Assert.assertNull(dataFile.columnSizes());
// selected fields are populated
Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts());
Assert.assertEquals(FILE_PATH, dataFile.path());

// not selected fields are all null and not projected
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.nanValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());
assertNullRecordCount(dataFile);
}
}

Expand All @@ -149,19 +191,31 @@ private void assertFullStats(DataFile dataFile) {
Assert.assertNull(dataFile.columnSizes());
Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts());
Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts());
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());
Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds());
Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds());
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());

Assert.assertEquals(FILE_PATH, dataFile.path()); // always select file path in all test cases
}

private void assertStatsDropped(DataFile dataFile) {
Assert.assertEquals(3, dataFile.recordCount()); // always select record count in all test cases
Assert.assertEquals(3, dataFile.recordCount()); // record count is not considered as droppable stats
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.valueCounts());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());

Assert.assertEquals(FILE_PATH, dataFile.path()); // always select file path in all test cases
}

private void assertNullRecordCount(DataFile dataFile) {
// record count is a primitive type, accessing null record count will throw NPE
AssertHelpers.assertThrows(
"Should throw NPE when accessing non-populated record count field",
NullPointerException.class,
dataFile::recordCount);
}

}

0 comments on commit 97703fb

Please sign in to comment.