Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: update record_count behavior, include in manifest reader #1820

Merged
merged 6 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand All @@ -39,7 +38,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;

Expand Down Expand Up @@ -166,7 +164,7 @@ public CloseableIterable<FileScanTask> planFiles() {

boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
if (!deleteFiles.isEmpty()) {
select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList()));
select(ManifestReader.withStatsColumns(columns));
}

Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
Expand All @@ -52,8 +53,9 @@
public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup implements CloseableIterable<F> {
static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
static final Set<String> STATS_COLUMNS = Sets.newHashSet(
"value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds");

private static final Set<String> STATS_COLUMNS = ImmutableSet.of(
"value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", "upper_bounds", "record_count");

protected enum FileType {
DATA_FILES(GenericDataFile.class.getName()),
Expand Down Expand Up @@ -282,16 +284,20 @@ private static boolean requireStatsProjection(Expression rowFilter, Collection<S

static boolean dropStats(Expression rowFilter, Collection<String> columns) {
// Make sure we only drop all stats if we had projected all stats
// We do not drop stats even if we had partially added some stats columns
return rowFilter != Expressions.alwaysTrue() &&
columns != null &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
// We do not drop stats even if we had partially added some stats columns, except for record_count column.
// Since we don't want to keep stats map which could be huge in size just because we select record_count, which
// is a primitive type.
if (rowFilter != Expressions.alwaysTrue() && columns != null &&
!columns.containsAll(ManifestReader.ALL_COLUMNS)) {
Set<String> intersection = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS);
return intersection.isEmpty() || intersection.equals(Sets.newHashSet("record_count"));
}
return false;
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
static List<String> withStatsColumns(Collection<String> columns) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
return Lists.newArrayList(columns);
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
Expand Down
88 changes: 71 additions & 17 deletions core/src/test/java/org/apache/iceberg/TestManifestReaderStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Map;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -54,9 +54,9 @@ public TestManifestReaderStats(int formatVersion) {

private static final Metrics METRICS = new Metrics(3L, null,
VALUE_COUNT, NULL_VALUE_COUNTS, NAN_VALUE_COUNTS, LOWER_BOUNDS, UPPER_BOUNDS);

private static final String FILE_PATH = "/path/to/data-a.parquet";
private static final DataFile FILE = DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withPath(FILE_PATH)
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(3)
Expand All @@ -74,7 +74,7 @@ public void testReadIncludesFullStats() throws IOException {
}

@Test
public void testReadWithFilterIncludesFullStats() throws IOException {
public void testReadEntriesWithFilterIncludesFullStats() throws IOException {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.filterRows(Expressions.equal("id", 3))) {
Expand All @@ -84,11 +84,21 @@ public void testReadWithFilterIncludesFullStats() throws IOException {
}
}

@Test
public void testReadIteratorWithFilterIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertFullStats(entry);
}
}

@Test
public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))
.select(ImmutableList.of("file_path"))
.filterRows(Expressions.equal("id", 3))) {
CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
ManifestEntry<DataFile> entry = entries.iterator().next();
Expand All @@ -100,47 +110,79 @@ public void testReadEntriesWithFilterAndSelectIncludesFullStats() throws IOExcep
public void testReadIteratorWithFilterAndSelectDropsStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))
.select(ImmutableList.of("file_path"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertStatsDropped(entry);
}
}

@Test
public void testReadIteratorWithFilterAndSelectRecordCountDropsStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableList.of("file_path", "record_count"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertStatsDropped(entry);
}
}

@Test
public void testReadIteratorWithFilterAndSelectStatsIncludesFullStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count", "value_counts"))
.select(ImmutableList.of("file_path", "value_counts"))
.filterRows(Expressions.equal("id", 3))) {
DataFile entry = reader.iterator().next();
assertFullStats(entry);

// explicitly call copyWithoutStats and ensure record count will not be dropped
assertStatsDropped(entry.copyWithoutStats());
}
}

@Test
public void testReadEntriesWithSelectNotIncludeFullStats() throws IOException {
public void testReadEntriesWithSelectNotProjectStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count"))) {
.select(ImmutableList.of("file_path"))) {
CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
ManifestEntry<DataFile> entry = entries.iterator().next();
assertStatsDropped(entry.file());
DataFile dataFile = entry.file();

// selected field is populated
Assert.assertEquals(FILE_PATH, dataFile.path());

// not selected fields are all null and not projected
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.valueCounts());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());
assertNullRecordCount(dataFile);
}
}

@Test
public void testReadEntriesWithSelectCertainStatNotIncludeFullStats() throws IOException {
public void testReadEntriesWithSelectCertainStatNotProjectStats() throws IOException {
ManifestFile manifest = writeManifest(1000L, FILE);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
.select(ImmutableSet.of("record_count", "value_counts"))) {
.select(ImmutableList.of("file_path", "value_counts"))) {
DataFile dataFile = reader.iterator().next();

Assert.assertEquals(3, dataFile.recordCount());
Assert.assertNull(dataFile.columnSizes());
// selected fields are populated
Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts());
Assert.assertEquals(FILE_PATH, dataFile.path());

// not selected fields are all null and not projected
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.nanValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to minimize changes, no need to move line for this. Same comment for L206

assertNullRecordCount(dataFile);
}
}

Expand All @@ -149,19 +191,31 @@ private void assertFullStats(DataFile dataFile) {
Assert.assertNull(dataFile.columnSizes());
Assert.assertEquals(VALUE_COUNT, dataFile.valueCounts());
Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts());
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());
Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds());
Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds());
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());

Assert.assertEquals(FILE_PATH, dataFile.path()); // always select file path in all test cases
}

private void assertStatsDropped(DataFile dataFile) {
Assert.assertEquals(3, dataFile.recordCount()); // always select record count in all test cases
Assert.assertEquals(3, dataFile.recordCount()); // record count is not considered as droppable stats
Assert.assertNull(dataFile.columnSizes());
Assert.assertNull(dataFile.valueCounts());
Assert.assertNull(dataFile.nullValueCounts());
Assert.assertNull(dataFile.lowerBounds());
Assert.assertNull(dataFile.upperBounds());
Assert.assertNull(dataFile.nanValueCounts());

Assert.assertEquals(FILE_PATH, dataFile.path()); // always select file path in all test cases
}

private void assertNullRecordCount(DataFile dataFile) {
// record count is a primitive type, accessing null record count will throw NPE
AssertHelpers.assertThrows(
"Should throw NPE when accessing non-populated record count field",
NullPointerException.class,
dataFile::recordCount);
}

}